You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/02/13 18:54:33 UTC
svn commit: r1567987 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/test/org/apache/hadoop/hive/ql/io/orc/
serde/src/java/org/apache/hadoop/hive/serde2/ shims/0.20/src/main/java...
Author: omalley
Date: Thu Feb 13 17:54:33 2014
New Revision: 1567987
URL: http://svn.apache.org/r1567987
Log:
HIVE-5728. Make ORC InputFormat/OutputFormat usable outside of Hive. (Daniel
Dai via omalley)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Feb 13 17:54:33 2014
@@ -517,8 +517,18 @@ public class HiveConf extends Configurat
// Define the default ORC stripe size
HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size",
256L * 1024 * 1024),
-
- HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
+ HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD(
+ "hive.exec.orc.dictionary.key.size.threshold", 0.8f),
+ // Define the default ORC index stride
+ HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride"
+ , 10000),
+ // Define the default ORC buffer size
+ HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024),
+ // Define the default block padding
+ HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding",
+ true),
+ // Define the default compression codec for ORC file
+ HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Feb 13 17:54:33 2014
@@ -1884,6 +1884,46 @@
</property>
<property>
+ <name>hive.exec.orc.default.stripe.size</name>
+ <value>268435456</value>
+ <description>
+ Define the default ORC stripe size.
+ </description>
+</property>
+
+<property>
+ <name>hive.exec.orc.default.row.index.stride</name>
+ <value>10000</value>
+ <description>
+ Define the default ORC index stride in number of rows.
+ </description>
+</property>
+
+<property>
+ <name>hive.exec.orc.default.buffer.size</name>
+ <value>262144</value>
+ <description>
+ Define the default ORC buffer size in bytes.
+ </description>
+</property>
+
+<property>
+ <name>hive.exec.orc.default.block.padding</name>
+ <value>true</value>
+ <description>
+ Define the default block padding.
+ </description>
+</property>
+
+<property>
+ <name>hive.exec.orc.default.compress</name>
+ <value>ZLIB</value>
+ <description>
+ Define the default compression codec for ORC file.
+ </description>
+</property>
+
+<property>
<name>hive.exec.orc.dictionary.key.size.threshold</name>
<value>0.8</value>
<description>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Thu Feb 13 17:54:33 2014
@@ -104,14 +104,6 @@ public final class OrcFile {
public static final String ENABLE_INDEXES = "orc.create.index";
public static final String BLOCK_PADDING = "orc.block.padding";
- static final long DEFAULT_STRIPE_SIZE =
- HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal;
- static final CompressionKind DEFAULT_COMPRESSION_KIND =
- CompressionKind.ZLIB;
- static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
- static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
- static final boolean DEFAULT_BLOCK_PADDING = true;
-
// unused
private OrcFile() {}
@@ -140,10 +132,10 @@ public final class OrcFile {
private FileSystem fileSystemValue = null;
private ObjectInspector inspectorValue = null;
private long stripeSizeValue;
- private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
- private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
- private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
- private CompressionKind compressValue = DEFAULT_COMPRESSION_KIND;
+ private int rowIndexStrideValue;
+ private int bufferSizeValue;
+ private boolean blockPaddingValue;
+ private CompressionKind compressValue;
private MemoryManager memoryManagerValue;
private Version versionValue;
@@ -152,7 +144,22 @@ public final class OrcFile {
memoryManagerValue = getMemoryManager(conf);
stripeSizeValue =
conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
- DEFAULT_STRIPE_SIZE);
+ HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal);
+ rowIndexStrideValue =
+ conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE
+ .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.defaultIntVal);
+ bufferSizeValue =
+ conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname,
+ HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.defaultIntVal);
+ blockPaddingValue =
+ conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING
+ .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING
+ .defaultBoolVal);
+ compressValue =
+ CompressionKind.valueOf(conf.get(HiveConf.ConfVars
+ .HIVE_ORC_DEFAULT_COMPRESS.varname,
+ HiveConf.ConfVars
+ .HIVE_ORC_DEFAULT_COMPRESS.defaultVal));
String versionName =
conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
if (versionName == null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Feb 13 17:54:33 2014
@@ -44,9 +44,8 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.orc.Metadata;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileGenerator;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator;
import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -63,7 +62,6 @@ import org.apache.hadoop.mapred.InputFor
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
@@ -99,8 +97,8 @@ public class OrcInputFormat implements
private static final double MIN_INCLUDED_LOCATION = 0.80;
private static class OrcRecordReader
- implements RecordReader<NullWritable, OrcStruct> {
- private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+ implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> {
+ private final RecordReader reader;
private final long offset;
private final long length;
private final int numColumns;
@@ -111,10 +109,7 @@ public class OrcInputFormat implements
long offset, long length) throws IOException {
List<OrcProto.Type> types = file.getTypes();
numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
- boolean[] includedColumns = findIncludedColumns(types, conf);
- String[] columnNames = getIncludedColumnNames(types, includedColumns, conf);
- SearchArgument sarg = createSarg(types, conf);
- this.reader = file.rows(offset, length, includedColumns, sarg, columnNames);
+ this.reader = createReaderFromFile(file, conf, offset, length);
this.offset = offset;
this.length = length;
}
@@ -155,6 +150,19 @@ public class OrcInputFormat implements
return progress;
}
}
+
+ static RecordReader createReaderFromFile(
+ Reader file, Configuration conf, long offset, long length)
+ throws IOException {
+ List<OrcProto.Type> types = file.getTypes();
+ boolean[] includedColumns = findIncludedColumns(types, conf);
+ String[] columnNames = getIncludedColumnNames(types, includedColumns,
+ conf);
+ SearchArgument sarg = createSarg(types, conf);
+ RecordReader reader =
+ file.rows(offset, length, includedColumns, sarg, columnNames);
+ return reader;
+ }
private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
@@ -244,14 +252,15 @@ public class OrcInputFormat implements
}
}
+ @SuppressWarnings("unchecked")
@Override
- public RecordReader<NullWritable, OrcStruct>
+ public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
if (isVectorMode(conf)) {
- RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
+ org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
reporter);
- return (RecordReader) vorr;
+ return (org.apache.hadoop.mapred.RecordReader) vorr;
}
FileSplit fSplit = (FileSplit)inputSplit;
reporter.setStatus(fSplit.toString());
@@ -308,7 +317,7 @@ public class OrcInputFormat implements
* @param conf The configuration of the job
* @return the list of input {@link Path}s for the map-reduce job.
*/
- static Path[] getInputPaths(JobConf conf) throws IOException {
+ static Path[] getInputPaths(Configuration conf) throws IOException {
String dirs = conf.get("mapred.input.dir");
if (dirs == null) {
throw new IOException("Configuration mapred.input.dir is not defined.");
@@ -326,10 +335,41 @@ public class OrcInputFormat implements
* the different worker threads.
*/
static class Context {
+ static class FileSplitInfo {
+ FileSplitInfo(Path file, long start, long length, String[] hosts,
+ FileMetaInfo fileMetaInfo) {
+ this.file = file;
+ this.start = start;
+ this.length = length;
+ this.hosts = hosts;
+ this.fileMetaInfo = fileMetaInfo;
+ }
+ Path getPath() {
+ return file;
+ }
+ long getStart() {
+ return start;
+ }
+ long getLength() {
+ return length;
+ }
+ String[] getLocations() {
+ return hosts;
+ }
+ FileMetaInfo getFileMetaInfo() {
+ return fileMetaInfo;
+ }
+ private Path file;
+ private long start;
+ private long length;
+ private String[] hosts;
+ FileMetaInfo fileMetaInfo;
+ }
private final Configuration conf;
private static Cache<Path, FileInfo> footerCache;
private final ExecutorService threadPool;
- private final List<OrcSplit> splits = new ArrayList<OrcSplit>(10000);
+ private final List<FileSplitInfo> splits =
+ new ArrayList<FileSplitInfo>(10000);
private final List<Throwable> errors = new ArrayList<Throwable>();
private final HadoopShims shims = ShimLoader.getHadoopShims();
private final long maxSize;
@@ -378,7 +418,7 @@ public class OrcInputFormat implements
* the back.
* @result the Nth file split
*/
- OrcSplit getResult(int index) {
+ FileSplitInfo getResult(int index) {
if (index >= 0) {
return splits.get(index);
} else {
@@ -556,8 +596,8 @@ public class OrcInputFormat implements
if(locations.length == 1 && file.getLen() < context.maxSize) {
String[] hosts = locations[0].getHosts();
synchronized (context.splits) {
- context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
- hosts, fileMetaInfo));
+ context.splits.add(new Context.FileSplitInfo(file.getPath(), 0,
+ file.getLen(), hosts, fileMetaInfo));
}
} else {
// if it requires a compute task
@@ -643,8 +683,8 @@ public class OrcInputFormat implements
hostList.toArray(hosts);
}
synchronized (context.splits) {
- context.splits.add(new OrcSplit(file.getPath(), offset, length,
- hosts, fileMetaInfo));
+ context.splits.add(new Context.FileSplitInfo(file.getPath(), offset,
+ length, hosts, fileMetaInfo));
}
}
@@ -851,35 +891,45 @@ public class OrcInputFormat implements
}
}
+ static List<Context.FileSplitInfo> generateSplitsInfo(Configuration conf)
+ throws IOException {
+ // use threads to resolve directories into splits
+ Context context = new Context(conf);
+ for(Path dir: getInputPaths(conf)) {
+ FileSystem fs = dir.getFileSystem(conf);
+ context.schedule(new FileGenerator(context, fs, dir));
+ }
+ context.waitForTasks();
+ // deal with exceptions
+ if (!context.errors.isEmpty()) {
+ List<IOException> errors =
+ new ArrayList<IOException>(context.errors.size());
+ for(Throwable th: context.errors) {
+ if (th instanceof IOException) {
+ errors.add((IOException) th);
+ } else {
+ throw new RuntimeException("serious problem", th);
+ }
+ }
+ throw new InvalidInputException(errors);
+ }
+ if (context.cacheStripeDetails) {
+ LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
+ + context.numFilesCounter.get());
+ }
+ return context.splits;
+ }
@Override
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
- // use threads to resolve directories into splits
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
- Context context = new Context(job);
- for(Path dir: getInputPaths(job)) {
- FileSystem fs = dir.getFileSystem(job);
- context.schedule(new FileGenerator(context, fs, dir));
- }
- context.waitForTasks();
- // deal with exceptions
- if (!context.errors.isEmpty()) {
- List<IOException> errors =
- new ArrayList<IOException>(context.errors.size());
- for(Throwable th: context.errors) {
- if (th instanceof IOException) {
- errors.add((IOException) th);
- } else {
- throw new RuntimeException("serious problem", th);
- }
- }
- throw new InvalidInputException(errors);
- }
- InputSplit[] result = new InputSplit[context.splits.size()];
- context.splits.toArray(result);
- if (context.cacheStripeDetails) {
- LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
- + context.numFilesCounter.get());
+ List<OrcInputFormat.Context.FileSplitInfo> splits =
+ OrcInputFormat.generateSplitsInfo(job);
+ InputSplit[] result = new InputSplit[splits.size()];
+ for (int i=0;i<splits.size();i++) {
+ OrcInputFormat.Context.FileSplitInfo split = splits.get(i);
+ result[i] = new OrcSplit(split.getPath(), split.getStart(),
+ split.getLength(), split.getLocations(), split.getFileMetaInfo());
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
return result;
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An InputFormat for ORC files. Keys are meaningless,
+ * value is the OrcStruct object */
+public class OrcNewInputFormat extends InputFormat<NullWritable, OrcStruct>{
+ private static final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ private static final String CLASS_NAME = ReaderImpl.class.getName();
+
+ @Override
+ public RecordReader<NullWritable, OrcStruct> createRecordReader(
+ InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ Path path = fileSplit.getPath();
+ FileSystem fs = path.getFileSystem(ShimLoader.getHadoopShims()
+ .getConfiguration(context));
+ return new OrcRecordReader(OrcFile.createReader(fs, path),
+ ShimLoader.getHadoopShims().getConfiguration(context),
+ fileSplit.getStart(), fileSplit.getLength());
+ }
+
+ private static class OrcRecordReader
+ extends RecordReader<NullWritable, OrcStruct> {
+ private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+ private final int numColumns;
+ OrcStruct value;
+ private float progress = 0.0f;
+
+ OrcRecordReader(Reader file, Configuration conf,
+ long offset, long length) throws IOException {
+ List<OrcProto.Type> types = file.getTypes();
+ numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+ value = new OrcStruct(numColumns);
+ this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset,
+ length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return NullWritable.get();
+ }
+
+
+ @Override
+ public OrcStruct getCurrentValue() throws IOException,
+ InterruptedException {
+ return value;
+ }
+
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return progress;
+ }
+
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (reader.hasNext()) {
+ reader.next(value);
+ progress = reader.getProgress();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
+ List<OrcInputFormat.Context.FileSplitInfo> splits =
+ OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
+ .getConfiguration(jobContext));
+ List<InputSplit> result = new ArrayList<InputSplit>();
+ for (OrcInputFormat.Context.FileSplitInfo split : splits) {
+ FileSplit newSplit = new OrcNewSplit(split.getPath(),
+ split.getStart(), split.getLength(), split.getLocations(),
+ split.getFileMetaInfo());
+ result.add(newSplit);
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
+ return result;
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An OutputFormat that writes ORC files. */
+public class OrcNewOutputFormat extends
+ FileOutputFormat<NullWritable, OrcSerdeRow> {
+
+ private static class OrcRecordWriter
+ extends RecordWriter<NullWritable, OrcSerdeRow> {
+ private Writer writer = null;
+ private final Path path;
+ private final OrcFile.WriterOptions options;
+ OrcRecordWriter(Path path, OrcFile.WriterOptions options) {
+ this.path = path;
+ this.options = options;
+ }
+ @Override
+ public void write(NullWritable key, OrcSerdeRow row)
+ throws IOException, InterruptedException {
+ if (writer == null) {
+ options.inspector(row.getInspector());
+ writer = OrcFile.createWriter(path, options);
+ }
+ writer.addRow(row.getRow());
+ }
+
+ @Override
+ public void close(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ if (writer == null) {
+ // a row with no columns
+ ObjectInspector inspector = ObjectInspectorFactory.
+ getStandardStructObjectInspector(new ArrayList<String>(),
+ new ArrayList<ObjectInspector>());
+ options.inspector(inspector);
+ writer = OrcFile.createWriter(path, options);
+ }
+ writer.close();
+ }
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Path file = getDefaultWorkFile(context, "");
+ return new
+ OrcRecordWriter(file, OrcFile.writerOptions(
+ ShimLoader.getHadoopShims().getConfiguration(context)));
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * OrcFileSplit. Holds file meta info
+ *
+ */
+public class OrcNewSplit extends FileSplit {
+ private Reader.FileMetaInfo fileMetaInfo;
+ private boolean hasFooter;
+
+ protected OrcNewSplit(){
+ //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
+ //This constructor is used to create the object and then call readFields()
+ // so just pass nulls to this super constructor.
+ super(null, 0, 0, (String[])null);
+ }
+
+ public OrcNewSplit(Path path, long offset, long length, String[] hosts,
+ FileMetaInfo fileMetaInfo) {
+ super(path, offset, length, hosts);
+ this.fileMetaInfo = fileMetaInfo;
+ hasFooter = this.fileMetaInfo != null;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ //serialize path, offset, length using FileSplit
+ super.write(out);
+
+ // Whether footer information follows.
+ out.writeBoolean(hasFooter);
+
+ if (hasFooter) {
+ // serialize FileMetaInfo fields
+ Text.writeString(out, fileMetaInfo.compressionType);
+ WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
+ WritableUtils.writeVInt(out, fileMetaInfo.metadataSize);
+
+ // serialize FileMetaInfo field footer
+ ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
+ footerBuff.reset();
+
+ // write length of buffer
+ WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
+ out.write(footerBuff.array(), footerBuff.position(),
+ footerBuff.limit() - footerBuff.position());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ //deserialize path, offset, length using FileSplit
+ super.readFields(in);
+
+ hasFooter = in.readBoolean();
+
+ if (hasFooter) {
+ // deserialize FileMetaInfo fields
+ String compressionType = Text.readString(in);
+ int bufferSize = WritableUtils.readVInt(in);
+ int metadataSize = WritableUtils.readVInt(in);
+
+ // deserialize FileMetaInfo field footer
+ int footerBuffSize = WritableUtils.readVInt(in);
+ ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
+ in.readFully(footerBuff.array(), 0, footerBuffSize);
+
+ fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff);
+ }
+ }
+
+ public FileMetaInfo getFileMetaInfo(){
+ return fileMetaInfo;
+ }
+
+ public boolean hasFooter() {
+ return hasFooter;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Thu Feb 13 17:54:33 2014
@@ -43,7 +43,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.Writable;
-final class OrcStruct implements Writable {
+final public class OrcStruct implements Writable {
private Object[] fields;
@@ -461,7 +461,7 @@ final class OrcStruct implements Writabl
}
}
- static ObjectInspector createObjectInspector(TypeInfo info) {
+ static public ObjectInspector createObjectInspector(TypeInfo info) {
switch (info.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) {
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Thu Feb 13 17:54:33 2014
@@ -428,7 +428,7 @@ public class TestInputOutputFormat {
new OrcInputFormat.SplitGenerator(context, fs,
fs.getFileStatus(new Path("/a/file")), null);
splitter.createSplit(0, 200, null);
- FileSplit result = context.getResult(-1);
+ OrcInputFormat.Context.FileSplitInfo result = context.getResult(-1);
assertEquals(0, result.getStart());
assertEquals(200, result.getLength());
assertEquals("/a/file", result.getPath().toString());
@@ -477,7 +477,7 @@ public class TestInputOutputFormat {
}
throw new IOException("Errors during splitting");
}
- FileSplit result = context.getResult(0);
+ OrcInputFormat.Context.FileSplitInfo result = context.getResult(0);
assertEquals(3, result.getStart());
assertEquals(497, result.getLength());
result = context.getResult(1);
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.assertFalse;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hive.common.util.HiveTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class TestNewInputOutputFormat {
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem localFs;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ conf.set("mapred.job.tracker", "local");
+ conf.set("fs.default.name", "local");
+ localFs = FileSystem.get(conf);
+ }
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ public static class OrcTestMapper1 extends
+ Mapper<Object, Writable, Text, Text> {
+ @Override
+ public void map(Object key, Writable value, Context context)
+ throws IOException, InterruptedException {
+ context.write(null, new Text(value.toString()));
+ }
+ }
+
+ @Test
+ // Test regular inputformat
+ public void testNewInputFormat() throws Exception {
+ Job job = new Job(conf, "orc test");
+ job.setInputFormatClass(OrcNewInputFormat.class);
+ job.setJarByClass(TestNewInputOutputFormat.class);
+ job.setMapperClass(OrcTestMapper1.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job,
+ new Path(HiveTestUtils.getFileFromClasspath("orc-file-11-format.orc")));
+ Path outputPath = new Path(workDir,
+ "TestOrcFile." + testCaseName.getMethodName() + ".txt");
+ localFs.delete(outputPath, true);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ boolean result = job.waitForCompletion(true);
+ assertTrue(result);
+ Path outputFilePath = new Path(outputPath, "part-m-00000");
+
+ assertTrue(localFs.exists(outputFilePath));
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(localFs.open(outputFilePath)));
+ int count=0;
+ String line;
+ String lastLine=null;
+ while ((line=reader.readLine()) != null) {
+ count++;
+ lastLine = line;
+ }
+ reader.close();
+ assertEquals(count, 7500);
+ assertEquals(lastLine, "{true, 100, 2048, 65536," +
+ " 9223372036854775807, 2.0, -5.0" +
+ ", , bye, {[{1, bye}, {2, sigh}]}, [{100000000, cat}," +
+ " {-100000, in}, {1234, hat}]," +
+ " {chani={5, chani}, mauddib={1, mauddib}}," +
+ " 2000-03-12 15:00:01.0, 12345678.6547457}");
+ localFs.delete(outputPath, true);
+ }
+
+ public static class OrcTestMapper2 extends Mapper<Object, Text, Object, Writable> {
+ private final TypeInfo typeInfo = TypeInfoUtils
+ .getTypeInfoFromTypeString("struct<a:int,b:string>");
+ private final ObjectInspector oip = TypeInfoUtils
+ .getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
+ private final OrcSerde serde = new OrcSerde();
+ private Writable row;
+ @Override
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String[] items = value.toString().split(",");
+ List<Object> struct = new ArrayList<Object>(2);
+ struct.add(0, Integer.parseInt(items[0]));
+ struct.add(1, items[1]);
+ row = serde.serialize(struct, oip);
+ context.write(null, row);
+ }
+ }
+
+ @Test
+ //Test regular outputformat
+ public void testNewOutputFormat() throws Exception {
+ int rownum=1000;
+
+ Path inputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".txt");
+ Path outputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".orc");
+ localFs.delete(outputPath, true);
+ PrintWriter pw = new PrintWriter(
+ new OutputStreamWriter(localFs.create(inputPath)));
+ Random r = new Random(1000L);
+ boolean firstRow = true;
+ int firstIntValue = 0;
+ String firstStringValue = null;
+ for (int i=0;i<rownum;i++) {
+ int intValue = r.nextInt();
+ String stringValue = UUID.randomUUID().toString();
+ if (firstRow) {
+ firstRow = false;
+ firstIntValue = intValue;
+ firstStringValue = stringValue;
+ }
+ pw.println(intValue + "," + stringValue);
+ }
+ pw.close();
+
+ Job job = new Job(conf, "orc test");
+ job.setOutputFormatClass(OrcNewOutputFormat.class);
+ job.setJarByClass(TestNewInputOutputFormat.class);
+ job.setMapperClass(OrcTestMapper2.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Writable.class);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ boolean result = job.waitForCompletion(true);
+ assertTrue(result);
+
+ Path outputFilePath = new Path(outputPath, "part-m-00000");
+ assertTrue(localFs.exists(outputFilePath));
+ Reader reader = OrcFile.createReader(localFs, outputFilePath);
+ assertTrue(reader.getNumberOfRows() == rownum);
+ assertEquals(reader.getCompression(), CompressionKind.ZLIB);
+ StructObjectInspector soi =
+ (StructObjectInspector)reader.getObjectInspector();
+ StructTypeInfo ti =
+ (StructTypeInfo)TypeInfoUtils.getTypeInfoFromObjectInspector(soi);
+ assertEquals(((PrimitiveTypeInfo)ti.getAllStructFieldTypeInfos().get(0))
+ .getPrimitiveCategory(),
+ PrimitiveObjectInspector.PrimitiveCategory.INT);
+ assertEquals(((PrimitiveTypeInfo)ti.getAllStructFieldTypeInfos().get(1))
+ .getPrimitiveCategory(),
+ PrimitiveObjectInspector.PrimitiveCategory.STRING);
+
+ RecordReader rows = reader.rows(null);
+ Object row = rows.next(null);
+
+ IntWritable intWritable = (IntWritable)soi.getStructFieldData(row,
+ soi.getAllStructFieldRefs().get(0));
+ Text text = (Text)soi.getStructFieldData(row,
+ soi.getAllStructFieldRefs().get(1));
+
+ assertEquals(intWritable.get(), firstIntValue);
+ assertEquals(text.toString(), firstStringValue);
+
+ localFs.delete(outputPath, true);
+ }
+
+ @Test
+ //Test outputformat with compression
+ public void testNewOutputFormatWithCompression() throws Exception {
+ conf.set("hive.exec.orc.default.compress", "SNAPPY");
+
+ Path inputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".txt");
+ Path outputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".orc");
+ localFs.delete(outputPath, true);
+ PrintWriter pw = new PrintWriter(
+ new OutputStreamWriter(localFs.create(inputPath)));
+ pw.println("1,hello");
+ pw.println("2,world");
+ pw.close();
+
+ Job job = new Job(conf, "orc test");
+ job.setOutputFormatClass(OrcNewOutputFormat.class);
+ job.setJarByClass(TestNewInputOutputFormat.class);
+ job.setMapperClass(OrcTestMapper2.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(OrcSerdeRow.class);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ boolean result = job.waitForCompletion(true);
+ assertTrue(result);
+
+ Path outputFilePath = new Path(outputPath, "part-m-00000");
+ Reader reader = OrcFile.createReader(localFs, outputFilePath);
+ assertEquals(reader.getCompression(), CompressionKind.SNAPPY);
+
+ localFs.delete(outputPath, true);
+ }
+
+ public static class OrcTestMapper3 extends
+ Mapper<Object, Text, IntWritable, Text> {
+ @Override
+ public void map(Object key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String items[] = value.toString().split("\\s+");
+ context.write(new IntWritable(items.length), value);
+ }
+ }
+
+ public static class OrcTestReducer3 extends
+ Reducer<IntWritable, Text, NullWritable, Writable> {
+ final static TypeInfo typeInfo =
+ TypeInfoUtils.getTypeInfoFromTypeString(
+ "struct<length:int,count:int,list:array" +
+ "<struct<lastword:string,lastwordlength:int>>," +
+ "wordcounts:map<string,int>>");
+ private final ObjectInspector oip =
+ TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
+ private final OrcSerde serde = new OrcSerde();
+ private Writable row;
+
+ @Override
+ public void reduce(IntWritable key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ List<String> lastwords = new ArrayList<String>();
+ Map<String, Integer> wordCounts = new HashMap<String, Integer>();
+ int count = 0;
+ for (Text val : values) {
+ String[] items = val.toString().toLowerCase().split("\\s+");
+ lastwords.add(items[items.length-1]);
+ for (String item : items) {
+ if (wordCounts.containsKey(item)) {
+ wordCounts.put(item, wordCounts.get(item)+1);
+ } else {
+ wordCounts.put(item, 1);
+ }
+ }
+ count++;
+ }
+ List<Object> struct = new ArrayList<Object>(4);
+ struct.add(0, key.get());
+ struct.add(1, count);
+ List<List<Object>> lastWordInfoList = new ArrayList<List<Object>>();
+ for (String word : lastwords) {
+ List<Object> info = new ArrayList<Object>(2);
+ info.add(0, word);
+ info.add(1, word.length());
+ lastWordInfoList.add(info);
+ }
+ struct.add(2, lastWordInfoList);
+ struct.add(3, wordCounts);
+ row = serde.serialize(struct, oip);
+ context.write(NullWritable.get(), row);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ //Test outputformat with complex data type, and with reduce
+ public void testNewOutputFormatComplex() throws Exception {
+ Path inputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".txt");
+ Path outputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".orc");
+ localFs.delete(outputPath, true);
+ PrintWriter pw = new PrintWriter(
+ new OutputStreamWriter(localFs.create(inputPath)));
+ pw.println("I have eaten");
+ pw.println("the plums");
+ pw.println("that were in");
+ pw.println("the icebox");
+ pw.println("and which");
+ pw.println("you were probably");
+ pw.println("saving");
+ pw.println("for breakfast");
+ pw.println("Forgive me");
+ pw.println("they were delicious");
+ pw.println("so sweet");
+ pw.println("and so cold");
+ pw.close();
+
+ Job job = new Job(conf, "orc test");
+ job.setOutputFormatClass(OrcNewOutputFormat.class);
+ job.setJarByClass(TestNewInputOutputFormat.class);
+ job.setMapperClass(OrcTestMapper3.class);
+ job.setReducerClass(OrcTestReducer3.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(OrcSerdeRow.class);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ boolean result = job.waitForCompletion(true);
+ assertTrue(result);
+
+ Path outputFilePath = new Path(outputPath, "part-r-00000");
+ Reader reader = OrcFile.createReader(localFs, outputFilePath);
+
+ RecordReader rows = reader.rows(null);
+ ObjectInspector orcOi = reader.getObjectInspector();
+ ObjectInspector stoi = TypeInfoUtils
+ .getStandardJavaObjectInspectorFromTypeInfo(OrcTestReducer3.typeInfo);
+ ObjectInspectorConverters.Converter converter = ObjectInspectorConverters
+ .getConverter(orcOi, stoi);
+
+ Object row = rows.next(null);
+ List<Object> converted = (List<Object>)converter.convert(row);
+ assertEquals(converted.get(0), 1);
+ assertEquals(converted.get(1), 1);
+ List<Object> list = (List<Object>)converted.get(2);
+ assertEquals(list.size(), 1);
+ assertEquals(((List<Object>)list.get(0)).get(0), "saving");
+ assertEquals(((List<Object>)list.get(0)).get(1), 6);
+ Map<String, Integer> map = (Map<String, Integer>)converted.get(3);
+ assertEquals(map.size(), 1);
+ assertEquals(map.get("saving"), new Integer(1));
+
+ row = rows.next(null);
+ converted = (List<Object>)converter.convert(row);
+ assertEquals(converted.get(0), 2);
+ assertEquals(converted.get(1), 6);
+ list = (List<Object>)converted.get(2);
+ assertEquals(list.size(), 6);
+ assertEquals(((List<Object>)list.get(0)).get(0), "plums");
+ assertEquals(((List<Object>)list.get(0)).get(1), 5);
+ map = (Map<String, Integer>)converted.get(3);
+ assertEquals(map.size(), 11);
+ assertEquals(map.get("the"), new Integer(2));
+
+ row = rows.next(null);
+ converted = (List<Object>)converter.convert(row);
+ assertEquals(converted.get(0), 3);
+ assertEquals(converted.get(1), 5);
+ list = (List<Object>)converted.get(2);
+ assertEquals(list.size(), 5);
+ assertEquals(((List<Object>)list.get(0)).get(0), "eaten");
+ assertEquals(((List<Object>)list.get(0)).get(1), 5);
+ map = (Map<String, Integer>)converted.get(3);
+ assertEquals(map.size(), 13);
+ assertEquals(map.get("were"), new Integer(3));
+
+ assertFalse(rows.hasNext());
+
+ localFs.delete(outputPath, true);
+ }
+
+ @Test
+ // Test inputformat with column prune
+ public void testNewInputFormatPruning() throws Exception {
+ conf.set("hive.io.file.read.all.columns", "false");
+ conf.set("hive.io.file.readcolumn.ids", "1,3");
+ Job job = new Job(conf, "orc test");
+ job.setInputFormatClass(OrcNewInputFormat.class);
+ job.setJarByClass(TestNewInputOutputFormat.class);
+ job.setMapperClass(OrcTestMapper1.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileInputFormat.addInputPath(job, new Path(HiveTestUtils
+ .getFileFromClasspath("orc-file-11-format.orc")));
+ Path outputPath = new Path(workDir, "TestOrcFile." +
+ testCaseName.getMethodName() + ".txt");
+ localFs.delete(outputPath, true);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ boolean result = job.waitForCompletion(true);
+ assertTrue(result);
+ Path outputFilePath = new Path(outputPath, "part-m-00000");
+
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(localFs.open(outputFilePath)));
+ String line=reader.readLine();
+
+ assertEquals(line, "{null, 1, null, 65536, null, null, null, " +
+ "null, null, null, null, null, null, null}");
+
+ localFs.delete(outputPath, true);
+ }
+}
Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Thu Feb 13 17:54:33 2014
@@ -31,9 +31,9 @@ import org.apache.hadoop.util.StringUtil
public final class ColumnProjectionUtils {
public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
+ public static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
- private static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
private static final boolean READ_ALL_COLUMNS_DEFAULT = true;
/**
Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Feb 13 17:54:33 2014
@@ -773,4 +773,8 @@ public class Hadoop20Shims implements Ha
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+ @Override
+ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
+ return context.getConfiguration();
+ }
}
Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Feb 13 17:54:33 2014
@@ -410,4 +410,9 @@ public class Hadoop20SShims extends Hado
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+
+ @Override
+ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
+ return context.getConfiguration();
+ }
}
Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Feb 13 17:54:33 2014
@@ -557,4 +557,9 @@ public class Hadoop23Shims extends Hadoo
ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
return ret;
}
+
+ @Override
+ public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
+ return context.getConfiguration();
+ }
}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Feb 13 17:54:33 2014
@@ -520,4 +520,10 @@ public interface HadoopShims {
public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
public Map<String, String> getHadoopConfNames();
+
+
+ /**
+ * Get configuration from JobContext
+ */
+ public Configuration getConfiguration(JobContext context);
}