You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/02/13 18:54:33 UTC

svn commit: r1567987 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/org/apache/hadoop/hive/ql/io/orc/ serde/src/java/org/apache/hadoop/hive/serde2/ shims/0.20/src/main/java...

Author: omalley
Date: Thu Feb 13 17:54:33 2014
New Revision: 1567987

URL: http://svn.apache.org/r1567987
Log:
HIVE-5728. Make ORC InputFormat/OutputFormat usable outside of Hive. (Daniel
Dai via omalley)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
    hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Feb 13 17:54:33 2014
@@ -517,8 +517,18 @@ public class HiveConf extends Configurat
     // Define the default ORC stripe size
     HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size",
         256L * 1024 * 1024),
-
-    HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
+    HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD(
+        "hive.exec.orc.dictionary.key.size.threshold", 0.8f),
+    // Define the default ORC index stride
+    HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride"
+        , 10000),
+    // Define the default ORC buffer size
+    HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024),
+    // Define the default block padding
+    HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding",
+        true),
+    // Define the default compression codec for ORC file
+    HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB"),
 
     HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false),
     HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Thu Feb 13 17:54:33 2014
@@ -1884,6 +1884,46 @@
 </property>
 
 <property>
+  <name>hive.exec.orc.default.stripe.size</name>
+  <value>268435456</value>
+  <description>
+    Define the default ORC stripe size.
+  </description>
+</property>
+
+<property>
+  <name>hive.exec.orc.default.row.index.stride</name>
+  <value>10000</value>
+  <description>
+    Define the default ORC index stride in number of rows.
+  </description>
+</property>
+
+<property>
+  <name>hive.exec.orc.default.buffer.size</name>
+  <value>262144</value>
+  <description>
+    Define the default ORC buffer size in bytes.
+  </description>
+</property>
+
+<property>
+  <name>hive.exec.orc.default.block.padding</name>
+  <value>true</value>
+  <description>
+    Define the default block padding.
+  </description>
+</property>
+
+<property>
+  <name>hive.exec.orc.default.compress</name>
+  <value>ZLIB</value>
+  <description>
+    Define the default compression codec for ORC file.
+  </description>
+</property>
+
+<property>
   <name>hive.exec.orc.dictionary.key.size.threshold</name>
   <value>0.8</value>
   <description>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Thu Feb 13 17:54:33 2014
@@ -104,14 +104,6 @@ public final class OrcFile {
   public static final String ENABLE_INDEXES = "orc.create.index";
   public static final String BLOCK_PADDING = "orc.block.padding";
 
-  static final long DEFAULT_STRIPE_SIZE =
-      HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal;
-  static final CompressionKind DEFAULT_COMPRESSION_KIND =
-    CompressionKind.ZLIB;
-  static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
-  static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
-  static final boolean DEFAULT_BLOCK_PADDING = true;
-
   // unused
   private OrcFile() {}
 
@@ -140,10 +132,10 @@ public final class OrcFile {
     private FileSystem fileSystemValue = null;
     private ObjectInspector inspectorValue = null;
     private long stripeSizeValue;
-    private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
-    private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
-    private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
-    private CompressionKind compressValue = DEFAULT_COMPRESSION_KIND;
+    private int rowIndexStrideValue;
+    private int bufferSizeValue;
+    private boolean blockPaddingValue;
+    private CompressionKind compressValue;
     private MemoryManager memoryManagerValue;
     private Version versionValue;
 
@@ -152,7 +144,22 @@ public final class OrcFile {
       memoryManagerValue = getMemoryManager(conf);
       stripeSizeValue =
           conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
-             DEFAULT_STRIPE_SIZE);
+              HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal);
+      rowIndexStrideValue =
+          conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE
+              .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE.defaultIntVal);
+      bufferSizeValue =
+          conf.getInt(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.varname,
+              HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE.defaultIntVal);
+      blockPaddingValue =
+          conf.getBoolean(HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING
+              .varname, HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING
+              .defaultBoolVal);
+      compressValue = 
+          CompressionKind.valueOf(conf.get(HiveConf.ConfVars
+              .HIVE_ORC_DEFAULT_COMPRESS.varname,
+              HiveConf.ConfVars
+              .HIVE_ORC_DEFAULT_COMPRESS.defaultVal));
       String versionName =
         conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
       if (versionName == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Feb 13 17:54:33 2014
@@ -44,9 +44,8 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.orc.Metadata;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileGenerator;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator;
 import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -63,7 +62,6 @@ import org.apache.hadoop.mapred.InputFor
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
 
@@ -99,8 +97,8 @@ public class OrcInputFormat  implements 
   private static final double MIN_INCLUDED_LOCATION = 0.80;
 
   private static class OrcRecordReader
-      implements RecordReader<NullWritable, OrcStruct> {
-    private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+      implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> {
+    private final RecordReader reader;
     private final long offset;
     private final long length;
     private final int numColumns;
@@ -111,10 +109,7 @@ public class OrcInputFormat  implements 
                     long offset, long length) throws IOException {
       List<OrcProto.Type> types = file.getTypes();
       numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
-      boolean[] includedColumns = findIncludedColumns(types, conf);
-      String[] columnNames = getIncludedColumnNames(types, includedColumns, conf);
-      SearchArgument sarg = createSarg(types, conf);
-      this.reader = file.rows(offset, length, includedColumns, sarg, columnNames);
+      this.reader = createReaderFromFile(file, conf, offset, length);
       this.offset = offset;
       this.length = length;
     }
@@ -155,6 +150,19 @@ public class OrcInputFormat  implements 
       return progress;
     }
   }
+  
+  static RecordReader createReaderFromFile(
+      Reader file, Configuration conf, long offset, long length)
+      throws IOException {
+    List<OrcProto.Type> types = file.getTypes();
+    boolean[] includedColumns = findIncludedColumns(types, conf);
+    String[] columnNames = getIncludedColumnNames(types, includedColumns,
+        conf);
+    SearchArgument sarg = createSarg(types, conf);
+    RecordReader reader =
+        file.rows(offset, length, includedColumns, sarg, columnNames);
+    return reader;
+  }
 
   private static final PathFilter hiddenFileFilter = new PathFilter(){
     public boolean accept(Path p){
@@ -244,14 +252,15 @@ public class OrcInputFormat  implements 
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public RecordReader<NullWritable, OrcStruct>
+  public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
       getRecordReader(InputSplit inputSplit, JobConf conf,
                       Reporter reporter) throws IOException {
     if (isVectorMode(conf)) {
-      RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
+      org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
           reporter);
-      return (RecordReader) vorr;
+      return (org.apache.hadoop.mapred.RecordReader) vorr;
     }
     FileSplit fSplit = (FileSplit)inputSplit;
     reporter.setStatus(fSplit.toString());
@@ -308,7 +317,7 @@ public class OrcInputFormat  implements 
    * @param conf The configuration of the job
    * @return the list of input {@link Path}s for the map-reduce job.
    */
-  static Path[] getInputPaths(JobConf conf) throws IOException {
+  static Path[] getInputPaths(Configuration conf) throws IOException {
     String dirs = conf.get("mapred.input.dir");
     if (dirs == null) {
       throw new IOException("Configuration mapred.input.dir is not defined.");
@@ -326,10 +335,41 @@ public class OrcInputFormat  implements 
    * the different worker threads.
    */
   static class Context {
+    static class FileSplitInfo {
+      FileSplitInfo(Path file, long start, long length, String[] hosts,
+          FileMetaInfo fileMetaInfo) {
+        this.file = file;
+        this.start = start;
+        this.length = length;
+        this.hosts = hosts;
+        this.fileMetaInfo = fileMetaInfo;
+      }
+      Path getPath() {
+        return file;
+      }
+      long getStart() {
+        return start;
+      }
+      long getLength() {
+        return length;
+      }
+      String[] getLocations() {
+        return hosts;
+      }
+      FileMetaInfo getFileMetaInfo() {
+        return fileMetaInfo;
+      }
+      private Path file;
+      private long start;
+      private long length;
+      private String[] hosts;
+      FileMetaInfo fileMetaInfo;
+    }
     private final Configuration conf;
     private static Cache<Path, FileInfo> footerCache;
     private final ExecutorService threadPool;
-    private final List<OrcSplit> splits = new ArrayList<OrcSplit>(10000);
+    private final List<FileSplitInfo> splits =
+        new ArrayList<FileSplitInfo>(10000);
     private final List<Throwable> errors = new ArrayList<Throwable>();
     private final HadoopShims shims = ShimLoader.getHadoopShims();
     private final long maxSize;
@@ -378,7 +418,7 @@ public class OrcInputFormat  implements 
      *     the back.
      * @result the Nth file split
      */
-    OrcSplit getResult(int index) {
+    FileSplitInfo getResult(int index) {
       if (index >= 0) {
         return splits.get(index);
       } else {
@@ -556,8 +596,8 @@ public class OrcInputFormat  implements 
       if(locations.length == 1 && file.getLen() < context.maxSize) {
         String[] hosts = locations[0].getHosts();
         synchronized (context.splits) {
-          context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
-                hosts, fileMetaInfo));
+          context.splits.add(new Context.FileSplitInfo(file.getPath(), 0,
+              file.getLen(), hosts, fileMetaInfo));
         }
       } else {
         // if it requires a compute task
@@ -643,8 +683,8 @@ public class OrcInputFormat  implements 
         hostList.toArray(hosts);
       }
       synchronized (context.splits) {
-        context.splits.add(new OrcSplit(file.getPath(), offset, length,
-            hosts, fileMetaInfo));
+        context.splits.add(new Context.FileSplitInfo(file.getPath(), offset,
+            length, hosts, fileMetaInfo));
       }
     }
 
@@ -851,35 +891,45 @@ public class OrcInputFormat  implements 
     }
   }
 
+  static List<Context.FileSplitInfo> generateSplitsInfo(Configuration conf)
+      throws IOException {
+	  // use threads to resolve directories into splits
+	  Context context = new Context(conf);
+	  for(Path dir: getInputPaths(conf)) {
+	    FileSystem fs = dir.getFileSystem(conf);
+	    context.schedule(new FileGenerator(context, fs, dir));
+	  }
+	  context.waitForTasks();
+	  // deal with exceptions
+	  if (!context.errors.isEmpty()) {
+	    List<IOException> errors =
+	        new ArrayList<IOException>(context.errors.size());
+	    for(Throwable th: context.errors) {
+	      if (th instanceof IOException) {
+	        errors.add((IOException) th);
+	      } else {
+	        throw new RuntimeException("serious problem", th);
+	      }
+	    }
+	    throw new InvalidInputException(errors);
+	  }
+    if (context.cacheStripeDetails) {
+      LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
+          + context.numFilesCounter.get());
+    }
+	  return context.splits;
+  }
   @Override
   public InputSplit[] getSplits(JobConf job,
                                 int numSplits) throws IOException {
-    // use threads to resolve directories into splits
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
-    Context context = new Context(job);
-    for(Path dir: getInputPaths(job)) {
-      FileSystem fs = dir.getFileSystem(job);
-      context.schedule(new FileGenerator(context, fs, dir));
-    }
-    context.waitForTasks();
-    // deal with exceptions
-    if (!context.errors.isEmpty()) {
-      List<IOException> errors =
-          new ArrayList<IOException>(context.errors.size());
-      for(Throwable th: context.errors) {
-        if (th instanceof IOException) {
-          errors.add((IOException) th);
-        } else {
-          throw new RuntimeException("serious problem", th);
-        }
-      }
-      throw new InvalidInputException(errors);
-    }
-    InputSplit[] result = new InputSplit[context.splits.size()];
-    context.splits.toArray(result);
-    if (context.cacheStripeDetails) {
-      LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
-          + context.numFilesCounter.get());
+    List<OrcInputFormat.Context.FileSplitInfo> splits =
+        OrcInputFormat.generateSplitsInfo(job);
+    InputSplit[] result = new InputSplit[splits.size()];
+    for (int i=0;i<splits.size();i++) {
+      OrcInputFormat.Context.FileSplitInfo split = splits.get(i);
+      result[i] = new OrcSplit(split.getPath(), split.getStart(),
+          split.getLength(), split.getLocations(), split.getFileMetaInfo());
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
     return result;

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/** An InputFormat for ORC files. Keys are meaningless,
+ * value is the OrcStruct object */
+public class OrcNewInputFormat extends InputFormat<NullWritable, OrcStruct>{
+  private static final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+  private static final String CLASS_NAME = ReaderImpl.class.getName();
+
+  @Override
+  public RecordReader<NullWritable, OrcStruct> createRecordReader(
+      InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    FileSplit fileSplit = (FileSplit) inputSplit;
+    Path path = fileSplit.getPath();
+    FileSystem fs = path.getFileSystem(ShimLoader.getHadoopShims()
+        .getConfiguration(context));
+    return new OrcRecordReader(OrcFile.createReader(fs, path),
+        ShimLoader.getHadoopShims().getConfiguration(context),
+        fileSplit.getStart(), fileSplit.getLength());
+  }
+
+  private static class OrcRecordReader
+    extends RecordReader<NullWritable, OrcStruct> {
+    private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+    private final int numColumns;
+    OrcStruct value;
+    private float progress = 0.0f;
+
+    OrcRecordReader(Reader file, Configuration conf,
+                    long offset, long length) throws IOException {
+      List<OrcProto.Type> types = file.getTypes();
+      numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+      value = new OrcStruct(numColumns);
+      this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset,
+          length);
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException,
+        InterruptedException {
+      return NullWritable.get();
+    }
+
+
+    @Override
+    public OrcStruct getCurrentValue() throws IOException,
+        InterruptedException {
+      return value;
+    }
+
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return progress;
+    }
+
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+    }
+
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (reader.hasNext()) {
+        reader.next(value);
+        progress = reader.getProgress();
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
+    List<OrcInputFormat.Context.FileSplitInfo> splits =
+        OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
+        .getConfiguration(jobContext));
+    List<InputSplit> result = new ArrayList<InputSplit>();
+    for (OrcInputFormat.Context.FileSplitInfo split : splits) {
+      FileSplit newSplit = new OrcNewSplit(split.getPath(),
+          split.getStart(), split.getLength(), split.getLocations(),
+          split.getFileMetaInfo());
+      result.add(newSplit);
+    }
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
+    return result;
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/** An OutputFormat that writes ORC files. */
+public class OrcNewOutputFormat extends
+    FileOutputFormat<NullWritable, OrcSerdeRow> {
+
+  private static class OrcRecordWriter
+  extends RecordWriter<NullWritable, OrcSerdeRow> {
+    private Writer writer = null;
+    private final Path path;
+    private final OrcFile.WriterOptions options;
+    OrcRecordWriter(Path path, OrcFile.WriterOptions options) {
+      this.path = path;
+      this.options = options;
+    }
+    @Override
+    public void write(NullWritable key, OrcSerdeRow row)
+        throws IOException, InterruptedException {
+      if (writer == null) {
+        options.inspector(row.getInspector());
+        writer = OrcFile.createWriter(path, options);
+      }
+      writer.addRow(row.getRow());
+    }
+
+    @Override
+    public void close(TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      if (writer == null) {
+        // a row with no columns
+        ObjectInspector inspector = ObjectInspectorFactory.
+            getStandardStructObjectInspector(new ArrayList<String>(),
+                new ArrayList<ObjectInspector>());
+        options.inspector(inspector);
+        writer = OrcFile.createWriter(path, options);
+      }
+      writer.close();
+    }
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    Path file = getDefaultWorkFile(context, "");
+    return new
+        OrcRecordWriter(file, OrcFile.writerOptions(
+            ShimLoader.getHadoopShims().getConfiguration(context)));
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * OrcFileSplit. Holds file meta info
+ *
+ */
+public class OrcNewSplit extends FileSplit {
+  private Reader.FileMetaInfo fileMetaInfo;
+  private boolean hasFooter;
+  
+  protected OrcNewSplit(){
+    //The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
+    //This constructor is used to create the object and then call readFields()
+    // so just pass nulls to this super constructor.
+    super(null, 0, 0, (String[])null);
+  }
+  
+  public OrcNewSplit(Path path, long offset, long length, String[] hosts,
+      FileMetaInfo fileMetaInfo) {
+    super(path, offset, length, hosts);
+    this.fileMetaInfo = fileMetaInfo;
+    hasFooter = this.fileMetaInfo != null;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    //serialize path, offset, length using FileSplit
+    super.write(out);
+
+    // Whether footer information follows.
+    out.writeBoolean(hasFooter);
+
+    if (hasFooter) {
+      // serialize FileMetaInfo fields
+      Text.writeString(out, fileMetaInfo.compressionType);
+      WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
+      WritableUtils.writeVInt(out, fileMetaInfo.metadataSize);
+
+      // serialize FileMetaInfo field footer
+      ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
+      footerBuff.reset();
+
+      // write length of buffer
+      WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
+      out.write(footerBuff.array(), footerBuff.position(),
+          footerBuff.limit() - footerBuff.position());
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    //deserialize path, offset, length using FileSplit
+    super.readFields(in);
+
+    hasFooter = in.readBoolean();
+
+    if (hasFooter) {
+      // deserialize FileMetaInfo fields
+      String compressionType = Text.readString(in);
+      int bufferSize = WritableUtils.readVInt(in);
+      int metadataSize = WritableUtils.readVInt(in);
+
+      // deserialize FileMetaInfo field footer
+      int footerBuffSize = WritableUtils.readVInt(in);
+      ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
+      in.readFully(footerBuff.array(), 0, footerBuffSize);
+
+      fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff);
+    }
+  }
+
+  public FileMetaInfo getFileMetaInfo(){
+    return fileMetaInfo;
+  }
+
+  public boolean hasFooter() {
+    return hasFooter;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Thu Feb 13 17:54:33 2014
@@ -43,7 +43,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.Writable;
 
-final class OrcStruct implements Writable {
+final public class OrcStruct implements Writable {
 
   private Object[] fields;
 
@@ -461,7 +461,7 @@ final class OrcStruct implements Writabl
     }
   }
 
-  static ObjectInspector createObjectInspector(TypeInfo info) {
+  static public ObjectInspector createObjectInspector(TypeInfo info) {
     switch (info.getCategory()) {
       case PRIMITIVE:
         switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) {

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Thu Feb 13 17:54:33 2014
@@ -428,7 +428,7 @@ public class TestInputOutputFormat {
         new OrcInputFormat.SplitGenerator(context, fs,
             fs.getFileStatus(new Path("/a/file")), null);
     splitter.createSplit(0, 200, null);
-    FileSplit result = context.getResult(-1);
+    OrcInputFormat.Context.FileSplitInfo result = context.getResult(-1);
     assertEquals(0, result.getStart());
     assertEquals(200, result.getLength());
     assertEquals("/a/file", result.getPath().toString());
@@ -477,7 +477,7 @@ public class TestInputOutputFormat {
       }
       throw new IOException("Errors during splitting");
     }
-    FileSplit result = context.getResult(0);
+    OrcInputFormat.Context.FileSplitInfo result = context.getResult(0);
     assertEquals(3, result.getStart());
     assertEquals(497, result.getLength());
     result = context.getResult(1);

Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java?rev=1567987&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewInputOutputFormat.java Thu Feb 13 17:54:33 2014
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.assertFalse;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hive.common.util.HiveTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class TestNewInputOutputFormat {
+  
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+    "target" + File.separator + "test" + File.separator + "tmp"));
+  
+  Configuration conf;
+  FileSystem localFs;
+  
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    conf.set("mapred.job.tracker", "local");
+    conf.set("fs.default.name", "local");
+    localFs = FileSystem.get(conf);
+  }
+  
+  @Rule
+  public TestName testCaseName = new TestName();
+  
+  public static class OrcTestMapper1 extends
+      Mapper<Object, Writable, Text, Text> {
+    @Override
+    public void map(Object key, Writable value, Context context)
+        throws IOException, InterruptedException {
+      context.write(null, new Text(value.toString()));
+    }
+  }
+
+  @Test
+  // Test regular inputformat
+  public void testNewInputFormat() throws Exception {
+    Job job = new Job(conf, "orc test");
+    job.setInputFormatClass(OrcNewInputFormat.class);
+    job.setJarByClass(TestNewInputOutputFormat.class);
+    job.setMapperClass(OrcTestMapper1.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job,
+        new Path(HiveTestUtils.getFileFromClasspath("orc-file-11-format.orc")));
+    Path outputPath = new Path(workDir,
+        "TestOrcFile." + testCaseName.getMethodName() + ".txt");
+    localFs.delete(outputPath, true);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    boolean result = job.waitForCompletion(true);
+    assertTrue(result);
+    Path outputFilePath = new Path(outputPath, "part-m-00000");
+
+    assertTrue(localFs.exists(outputFilePath));
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(localFs.open(outputFilePath)));
+    int count=0;
+    String line;
+    String lastLine=null;
+    while ((line=reader.readLine()) != null) {
+      count++;
+      lastLine = line;
+    }
+    reader.close();
+    assertEquals(count, 7500);
+    assertEquals(lastLine, "{true, 100, 2048, 65536," +
+        " 9223372036854775807, 2.0, -5.0" + 
+        ", , bye, {[{1, bye}, {2, sigh}]}, [{100000000, cat}," +
+        " {-100000, in}, {1234, hat}]," +
+        " {chani={5, chani}, mauddib={1, mauddib}}," +
+        " 2000-03-12 15:00:01.0, 12345678.6547457}");
+    localFs.delete(outputPath, true);
+  }
+  
+  public static class OrcTestMapper2 extends Mapper<Object, Text, Object, Writable> {
+    private final TypeInfo typeInfo = TypeInfoUtils
+        .getTypeInfoFromTypeString("struct<a:int,b:string>");
+    private final ObjectInspector oip = TypeInfoUtils
+        .getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
+    private final OrcSerde serde = new OrcSerde();
+    private Writable row;
+    @Override
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String[] items = value.toString().split(",");
+      List<Object> struct = new ArrayList<Object>(2);
+      struct.add(0, Integer.parseInt(items[0]));
+      struct.add(1, items[1]);
+      row = serde.serialize(struct, oip);
+      context.write(null, row);
+    }
+  }
+  
+  @Test
+  //Test regular outputformat
+  public void testNewOutputFormat() throws Exception {
+    int rownum=1000;
+    
+    Path inputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".txt");
+    Path outputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    localFs.delete(outputPath, true);
+    PrintWriter pw = new PrintWriter(
+        new OutputStreamWriter(localFs.create(inputPath)));
+    Random r = new Random(1000L);
+    boolean firstRow = true;
+    int firstIntValue = 0;
+    String firstStringValue = null;
+    for (int i=0;i<rownum;i++) {
+      int intValue = r.nextInt();
+      String stringValue = UUID.randomUUID().toString();
+      if (firstRow) {
+        firstRow = false;
+        firstIntValue = intValue;
+        firstStringValue = stringValue;
+      }
+      pw.println(intValue + "," + stringValue);
+    }
+    pw.close();
+
+    Job job = new Job(conf, "orc test");
+    job.setOutputFormatClass(OrcNewOutputFormat.class);
+    job.setJarByClass(TestNewInputOutputFormat.class);
+    job.setMapperClass(OrcTestMapper2.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Writable.class);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    boolean result = job.waitForCompletion(true);
+    assertTrue(result);
+    
+    Path outputFilePath = new Path(outputPath, "part-m-00000");
+    assertTrue(localFs.exists(outputFilePath));
+    Reader reader = OrcFile.createReader(localFs, outputFilePath);
+    assertTrue(reader.getNumberOfRows() == rownum);
+    assertEquals(reader.getCompression(), CompressionKind.ZLIB);
+    StructObjectInspector soi =
+        (StructObjectInspector)reader.getObjectInspector();
+    StructTypeInfo ti =
+        (StructTypeInfo)TypeInfoUtils.getTypeInfoFromObjectInspector(soi);
+    assertEquals(((PrimitiveTypeInfo)ti.getAllStructFieldTypeInfos().get(0))
+        .getPrimitiveCategory(),
+        PrimitiveObjectInspector.PrimitiveCategory.INT);
+    assertEquals(((PrimitiveTypeInfo)ti.getAllStructFieldTypeInfos().get(1))
+        .getPrimitiveCategory(),
+        PrimitiveObjectInspector.PrimitiveCategory.STRING);
+    
+    RecordReader rows = reader.rows(null);
+    Object row = rows.next(null);
+    
+    IntWritable intWritable = (IntWritable)soi.getStructFieldData(row,
+        soi.getAllStructFieldRefs().get(0));
+    Text text = (Text)soi.getStructFieldData(row,
+        soi.getAllStructFieldRefs().get(1));
+    
+    assertEquals(intWritable.get(), firstIntValue);
+    assertEquals(text.toString(), firstStringValue);
+    
+    localFs.delete(outputPath, true);
+  }
+  
+  @Test
+  //Test outputformat with compression
+  public void testNewOutputFormatWithCompression() throws Exception {
+    conf.set("hive.exec.orc.default.compress", "SNAPPY");
+    
+    Path inputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".txt");
+    Path outputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    localFs.delete(outputPath, true);
+    PrintWriter pw = new PrintWriter(
+        new OutputStreamWriter(localFs.create(inputPath)));
+    pw.println("1,hello");
+    pw.println("2,world");
+    pw.close();
+
+    Job job = new Job(conf, "orc test");
+    job.setOutputFormatClass(OrcNewOutputFormat.class);
+    job.setJarByClass(TestNewInputOutputFormat.class);
+    job.setMapperClass(OrcTestMapper2.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcSerdeRow.class);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    boolean result = job.waitForCompletion(true);
+    assertTrue(result);
+    
+    Path outputFilePath = new Path(outputPath, "part-m-00000");
+    Reader reader = OrcFile.createReader(localFs, outputFilePath);
+    assertEquals(reader.getCompression(), CompressionKind.SNAPPY);
+    
+    localFs.delete(outputPath, true);
+  }
+  
+  public static class OrcTestMapper3 extends
+      Mapper<Object, Text, IntWritable, Text> {
+    @Override
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String items[] = value.toString().split("\\s+");
+      context.write(new IntWritable(items.length), value);
+    }
+  }
+
+  public static class OrcTestReducer3 extends
+      Reducer<IntWritable, Text, NullWritable, Writable> {
+    final static TypeInfo typeInfo =
+        TypeInfoUtils.getTypeInfoFromTypeString(
+        "struct<length:int,count:int,list:array" +
+        "<struct<lastword:string,lastwordlength:int>>," +
+        "wordcounts:map<string,int>>");
+    private final ObjectInspector oip =
+        TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
+    private final OrcSerde serde = new OrcSerde();
+    private Writable row;
+
+    @Override
+    public void reduce(IntWritable key, Iterable<Text> values, Context context)
+        throws IOException, InterruptedException {
+      List<String> lastwords = new ArrayList<String>();
+      Map<String, Integer> wordCounts = new HashMap<String, Integer>();
+      int count = 0;
+      for (Text val : values) {
+        String[] items = val.toString().toLowerCase().split("\\s+");
+        lastwords.add(items[items.length-1]);
+        for (String item : items) {
+          if (wordCounts.containsKey(item)) {
+            wordCounts.put(item, wordCounts.get(item)+1);
+          } else {
+            wordCounts.put(item, 1);
+          }
+        }
+        count++;
+      }
+      List<Object> struct = new ArrayList<Object>(4);
+      struct.add(0, key.get());
+      struct.add(1, count);
+      List<List<Object>> lastWordInfoList = new ArrayList<List<Object>>();
+      for (String word : lastwords) {
+        List<Object> info = new ArrayList<Object>(2);
+        info.add(0, word);
+        info.add(1, word.length());
+        lastWordInfoList.add(info);
+      }
+      struct.add(2, lastWordInfoList);
+      struct.add(3, wordCounts);
+      row = serde.serialize(struct, oip);
+      context.write(NullWritable.get(), row);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  //Test outputformat with complex data type, and with reduce
+  public void testNewOutputFormatComplex() throws Exception {
+    Path inputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".txt");
+    Path outputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".orc");
+    localFs.delete(outputPath, true);
+    PrintWriter pw = new PrintWriter(
+        new OutputStreamWriter(localFs.create(inputPath)));
+    pw.println("I have eaten");
+    pw.println("the plums");
+    pw.println("that were in");
+    pw.println("the icebox");
+    pw.println("and which");
+    pw.println("you were probably");
+    pw.println("saving");
+    pw.println("for breakfast");
+    pw.println("Forgive me");
+    pw.println("they were delicious");
+    pw.println("so sweet");
+    pw.println("and so cold");
+    pw.close();
+
+    Job job = new Job(conf, "orc test");
+    job.setOutputFormatClass(OrcNewOutputFormat.class);
+    job.setJarByClass(TestNewInputOutputFormat.class);
+    job.setMapperClass(OrcTestMapper3.class);
+    job.setReducerClass(OrcTestReducer3.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(OrcSerdeRow.class);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    boolean result = job.waitForCompletion(true);
+    assertTrue(result);
+    
+    Path outputFilePath = new Path(outputPath, "part-r-00000");
+    Reader reader = OrcFile.createReader(localFs, outputFilePath);
+    
+    RecordReader rows = reader.rows(null);
+    ObjectInspector orcOi = reader.getObjectInspector();
+    ObjectInspector stoi = TypeInfoUtils
+        .getStandardJavaObjectInspectorFromTypeInfo(OrcTestReducer3.typeInfo);
+    ObjectInspectorConverters.Converter converter = ObjectInspectorConverters
+        .getConverter(orcOi, stoi);
+    
+    Object row = rows.next(null);
+    List<Object> converted = (List<Object>)converter.convert(row);
+    assertEquals(converted.get(0), 1);
+    assertEquals(converted.get(1), 1);
+    List<Object> list = (List<Object>)converted.get(2);
+    assertEquals(list.size(), 1);
+    assertEquals(((List<Object>)list.get(0)).get(0), "saving");
+    assertEquals(((List<Object>)list.get(0)).get(1), 6);
+    Map<String, Integer> map = (Map<String, Integer>)converted.get(3);
+    assertEquals(map.size(), 1);
+    assertEquals(map.get("saving"), new Integer(1));
+    
+    row = rows.next(null);
+    converted = (List<Object>)converter.convert(row);
+    assertEquals(converted.get(0), 2);
+    assertEquals(converted.get(1), 6);
+    list = (List<Object>)converted.get(2);
+    assertEquals(list.size(), 6);
+    assertEquals(((List<Object>)list.get(0)).get(0), "plums");
+    assertEquals(((List<Object>)list.get(0)).get(1), 5);
+    map = (Map<String, Integer>)converted.get(3);
+    assertEquals(map.size(), 11);
+    assertEquals(map.get("the"), new Integer(2));
+    
+    row = rows.next(null);
+    converted = (List<Object>)converter.convert(row);
+    assertEquals(converted.get(0), 3);
+    assertEquals(converted.get(1), 5);
+    list = (List<Object>)converted.get(2);
+    assertEquals(list.size(), 5);
+    assertEquals(((List<Object>)list.get(0)).get(0), "eaten");
+    assertEquals(((List<Object>)list.get(0)).get(1), 5);
+    map = (Map<String, Integer>)converted.get(3);
+    assertEquals(map.size(), 13);
+    assertEquals(map.get("were"), new Integer(3));
+    
+    assertFalse(rows.hasNext());
+    
+    localFs.delete(outputPath, true);
+  }
+  
+  @Test
+  // Test inputformat with column prune
+  public void testNewInputFormatPruning() throws Exception {
+    conf.set("hive.io.file.read.all.columns", "false");
+    conf.set("hive.io.file.readcolumn.ids", "1,3");
+    Job job = new Job(conf, "orc test");
+    job.setInputFormatClass(OrcNewInputFormat.class);
+    job.setJarByClass(TestNewInputOutputFormat.class);
+    job.setMapperClass(OrcTestMapper1.class);
+    job.setNumReduceTasks(0);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(HiveTestUtils
+        .getFileFromClasspath("orc-file-11-format.orc")));
+    Path outputPath = new Path(workDir, "TestOrcFile." +
+        testCaseName.getMethodName() + ".txt");
+    localFs.delete(outputPath, true);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    boolean result = job.waitForCompletion(true);
+    assertTrue(result);
+    Path outputFilePath = new Path(outputPath, "part-m-00000");
+
+    BufferedReader reader = new BufferedReader(
+        new InputStreamReader(localFs.open(outputFilePath)));
+    String line=reader.readLine();
+    
+    assertEquals(line, "{null, 1, null, 65536, null, null, null, " +
+        "null, null, null, null, null, null, null}");
+
+    localFs.delete(outputPath, true);
+  }
+}

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Thu Feb 13 17:54:33 2014
@@ -31,9 +31,9 @@ import org.apache.hadoop.util.StringUtil
 public final class ColumnProjectionUtils {
 
   public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
+  public static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
   public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
   private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
-  private static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
   private static final boolean READ_ALL_COLUMNS_DEFAULT = true;
 
   /**

Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Thu Feb 13 17:54:33 2014
@@ -773,4 +773,8 @@ public class Hadoop20Shims implements Ha
     ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
     return ret;
   }
+  @Override
+  public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
+    return context.getConfiguration();
+  }
 }

Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Feb 13 17:54:33 2014
@@ -410,4 +410,9 @@ public class Hadoop20SShims extends Hado
     ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
     return ret;
   }
+  
+  @Override
+  public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
+    return context.getConfiguration();
+  }
 }

Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Feb 13 17:54:33 2014
@@ -557,4 +557,9 @@ public class Hadoop23Shims extends Hadoo
     ret.put("MAPREDTASKCLEANUPNEEDED", "mapreduce.job.committer.task.cleanup.needed");
     return ret;
   }
+  
+  @Override
+  public Configuration getConfiguration(org.apache.hadoop.mapreduce.JobContext context) {
+    return context.getConfiguration();
+  }
 }

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1567987&r1=1567986&r2=1567987&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Feb 13 17:54:33 2014
@@ -520,4 +520,10 @@ public interface HadoopShims {
   public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
 
   public Map<String, String> getHadoopConfNames();
+
+  
+  /**
+   * Get configuration from JobContext
+   */
+  public Configuration getConfiguration(JobContext context);
 }