You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/06/02 03:59:04 UTC

[1/2] orc git commit: ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and OutputFormat. (omalley)

Repository: orc
Updated Branches:
  refs/heads/master 545fe3712 -> 3bb5ce532


http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
new file mode 100644
index 0000000..f686e05
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.orc.mapred.OrcStruct;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapreduce API.
+ * It is in the org.apache.orc.mapred package to share implementation with
+ * the mapred API record reader.
+ * @param <V> the root type of the file
+ */
+public class OrcMapreduceRecordReader<V extends WritableComparable>
+    extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, V> {
+  private final TypeDescription schema;
+  private final RecordReader batchReader;
+  private final VectorizedRowBatch batch;
+  private int rowInBatch;
+  private final V row;
+
+  public OrcMapreduceRecordReader(Reader fileReader,
+                                  Reader.Options options) throws IOException {
+    this.batchReader = fileReader.rows(options);
+    if (options.getSchema() == null) {
+      schema = fileReader.getSchema();
+    } else {
+      schema = options.getSchema();
+    }
+    this.batch = schema.createRowBatch();
+    rowInBatch = 0;
+    this.row = (V) OrcStruct.createValue(schema);
+  }
+
+  /**
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   * @throws IOException
+   */
+  boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      rowInBatch = 0;
+      return batchReader.nextBatch(batch);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    batchReader.close();
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+                         TaskAttemptContext taskAttemptContext) {
+    // nothing required
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!ensureBatch()) {
+      return false;
+    }
+    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+      OrcStruct result = (OrcStruct) row;
+      List<TypeDescription> children = schema.getChildren();
+      int numberOfChildren = children.size();
+      for(int i=0; i < numberOfChildren; ++i) {
+        result.setFieldValue(i, OrcMapredRecordReader.nextValue(batch.cols[i], rowInBatch,
+            children.get(i), result.getFieldValue(i)));
+      }
+    } else {
+      OrcMapredRecordReader.nextValue(batch.cols[0], rowInBatch, schema, row);
+    }
+    rowInBatch += 1;
+    return true;
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return batchReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
new file mode 100644
index 0000000..9379584
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcMapredRecordWriter;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+
+import java.io.IOException;
+
+public class OrcMapreduceRecordWriter<V extends Writable>
+    extends RecordWriter<NullWritable, V> {
+
+  private final Writer writer;
+  private final VectorizedRowBatch batch;
+  private final TypeDescription schema;
+  private final boolean isTopStruct;
+
+  public OrcMapreduceRecordWriter(Writer writer) {
+    this.writer = writer;
+    schema = writer.getSchema();
+    this.batch = schema.createRowBatch();
+    isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
+  }
+
+  @Override
+  public void write(NullWritable nullWritable, V v) throws IOException {
+    // if the batch is full, write it out.
+    if (batch.size == batch.getMaxSize()) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+
+    // add the new row
+    int row = batch.size++;
+    // skip over the OrcKey or OrcValue
+    if (v instanceof OrcKey) {
+      v = (V)((OrcKey) v).key;
+    } else if (v instanceof OrcValue) {
+      v = (V)((OrcValue) v).value;
+    }
+    if (isTopStruct) {
+      for(int f=0; f < schema.getChildren().size(); ++f) {
+        OrcMapredRecordWriter.setColumn(schema.getChildren().get(f),
+            batch.cols[f], row, ((OrcStruct) v).getFieldValue(f));
+      }
+    } else {
+      OrcMapredRecordWriter.setColumn(schema, batch.cols[0], row, v);
+    }
+  }
+
+  @Override
+  public void close(TaskAttemptContext taskAttemptContext) throws IOException {
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
new file mode 100644
index 0000000..797998c
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+/**
+ * An ORC output format that satisfies the org.apache.hadoop.mapreduce API.
+ */
+public class OrcOutputFormat<V extends Writable>
+    extends FileOutputFormat<NullWritable, V> {
+  private static final String EXTENSION = ".orc";
+  // This is useful for unit tests or local runs where you don't need the
+  // output committer.
+  public static final String SKIP_TEMP_DIRECTORY =
+      "orc.mapreduce.output.skip-temporary-directory";
+
+  @Override
+  public RecordWriter<NullWritable, V>
+       getRecordWriter(TaskAttemptContext taskAttemptContext
+                       ) throws IOException {
+    Configuration conf = taskAttemptContext.getConfiguration();
+    Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION);
+    Writer writer = OrcFile.createWriter(filename,
+        org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf));
+     return new OrcMapreduceRecordWriter<V>(writer);
+  }
+
+  @Override
+  public Path getDefaultWorkFile(TaskAttemptContext context,
+                                 String extension) throws IOException {
+    if (context.getConfiguration().getBoolean(SKIP_TEMP_DIRECTORY, false)) {
+      return new Path(getOutputPath(context),
+          getUniqueFile(context, getOutputName(context), extension));
+    } else {
+      return super.getDefaultWorkFile(context, extension);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
new file mode 100644
index 0000000..cd11603
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestMrUnit.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mrunit.MapReduceDriver;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestMrUnit {
+  JobConf conf = new JobConf();
+
+  /**
+   * Split the input struct into its two parts.
+   */
+  public static class MyMapper
+      implements Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+    private OrcKey keyWrapper = new OrcKey();
+    private OrcValue valueWrapper = new OrcValue();
+
+    @Override
+    public void map(NullWritable key, OrcStruct value,
+                    OutputCollector<OrcKey, OrcValue> outputCollector,
+                    Reporter reporter) throws IOException {
+      keyWrapper.key = value.getFieldValue(0);
+      valueWrapper.value = value.getFieldValue(1);
+      outputCollector.collect(keyWrapper, valueWrapper);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // PASS
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      // PASS
+    }
+  }
+
+  /**
+   * Glue the key and values back together.
+   */
+  public static class MyReducer
+      implements Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+    private OrcStruct output = new OrcStruct(TypeDescription.fromString
+        ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
+    private final NullWritable nada = NullWritable.get();
+
+    @Override
+    public void reduce(OrcKey key, Iterator<OrcValue> iterator,
+                       OutputCollector<NullWritable, OrcStruct> collector,
+                       Reporter reporter) throws IOException {
+      output.setFieldValue(0, key.key);
+      while (iterator.hasNext()) {
+        OrcValue value = iterator.next();
+        output.setFieldValue(1, value.value);
+        collector.collect(nada, output);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      // PASS
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+      // PASS
+    }
+  }
+
+  /**
+   * This class is intended to support MRUnit's object copying for input and
+   * output objects.
+   *
+   * Real mapreduce contexts should NEVER use this class.
+   *
+   * The type string is serialized before each value.
+   */
+  public static class OrcStructSerialization
+      implements Serialization<OrcStruct> {
+
+    @Override
+    public boolean accept(Class<?> cls) {
+      return OrcStruct.class.isAssignableFrom(cls);
+    }
+
+    @Override
+    public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) {
+      return new Serializer<OrcStruct>() {
+        DataOutputStream dataOut;
+
+        public void open(OutputStream out) {
+          if(out instanceof DataOutputStream) {
+            dataOut = (DataOutputStream)out;
+          } else {
+            dataOut = new DataOutputStream(out);
+          }
+        }
+
+        public void serialize(OrcStruct w) throws IOException {
+          Text.writeString(dataOut, w.getSchema().toString());
+          w.write(dataOut);
+        }
+
+        public void close() throws IOException {
+          dataOut.close();
+        }
+      };
+    }
+
+    @Override
+    public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) {
+      return new Deserializer<OrcStruct>() {
+        DataInputStream input;
+
+        @Override
+        public void open(InputStream inputStream) throws IOException {
+          if(inputStream instanceof DataInputStream) {
+            input = (DataInputStream)inputStream;
+          } else {
+            input = new DataInputStream(inputStream);
+          }
+        }
+
+        @Override
+        public OrcStruct deserialize(OrcStruct orcStruct) throws IOException {
+          String typeStr = Text.readString(input);
+          OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr));
+          result.readFields(input);
+          return result;
+        }
+
+        @Override
+        public void close() throws IOException {
+          // PASS
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testMapred() throws IOException {
+    conf.set("io.serializations",
+        OrcStructSerialization.class.getName() + "," +
+            WritableSerialization.class.getName());
+    OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>");
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>");
+    MyMapper mapper = new MyMapper();
+    mapper.configure(conf);
+    MyReducer reducer = new MyReducer();
+    reducer.configure(conf);
+    MapReduceDriver<NullWritable, OrcStruct,
+                    OrcKey, OrcValue,
+                    NullWritable, OrcStruct> driver =
+        new MapReduceDriver<>(mapper, reducer);
+    driver.setConfiguration(conf);
+    NullWritable nada = NullWritable.get();
+    OrcStruct input = (OrcStruct) OrcStruct.createValue(
+        TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>"));
+    IntWritable x =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+    IntWritable y =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+    Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+    // generate the input stream
+    for(int r=0; r < 20; ++r) {
+      x.set(100 -  (r / 4));
+      y.set(r*2);
+      z.set(Integer.toHexString(r));
+      driver.withInput(nada, input);
+    }
+
+    // generate the expected outputs
+    for(int g=4; g >= 0; --g) {
+      x.set(100 - g);
+      for(int i=0; i < 4; ++i) {
+        int r = g * 4 + i;
+        y.set(r * 2);
+        z.set(Integer.toHexString(r));
+        driver.withOutput(nada, input);
+      }
+    }
+    driver.runTest();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
new file mode 100644
index 0000000..a915ed3
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOrcOutputFormat {
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+  JobConf conf = new JobConf();
+  FileSystem fs;
+
+  {
+    try {
+      fs =  FileSystem.getLocal(conf).getRaw();
+      fs.delete(workDir, true);
+      fs.mkdirs(workDir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+  }
+
+  static class NullOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // PASS
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) {
+      // PASS
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      // PASS
+    }
+  }
+
+  @Test
+  public void testAllTypes() throws Exception {
+    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," +
+        "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," +
+        "l:array<bigint>,map:map<smallint,string>," +
+        "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    FileOutputFormat.setOutputPath(conf, workDir);
+    TypeDescription type = TypeDescription.fromString(typeStr);
+
+    // build a row object
+    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+    ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4);
+    ((BooleanWritable) row.getFieldValue(1)).set(true);
+    ((ByteWritable) row.getFieldValue(2)).set((byte) 23);
+    ((Text) row.getFieldValue(3)).set("aaabbbcccddd");
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+    ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays
+        (format.parse("2016-04-01").getTime()));
+    ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23"));
+    ((DoubleWritable) row.getFieldValue(6)).set(1.5);
+    ((FloatWritable) row.getFieldValue(7)).set(4.5f);
+    ((IntWritable) row.getFieldValue(8)).set(31415);
+    OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9);
+    longList.add(new LongWritable(123));
+    longList.add(new LongWritable(456));
+    OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10);
+    map.put(new ShortWritable((short) 1000), new Text("aaaa"));
+    map.put(new ShortWritable((short) 123), new Text("bbbb"));
+    OrcStruct struct = (OrcStruct) row.getFieldValue(11);
+    OrcUnion union = (OrcUnion) struct.getFieldValue(0);
+    union.set((byte) 1, new Text("abcde"));
+    ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00");
+    NullWritable nada = NullWritable.get();
+    RecordWriter<NullWritable, OrcStruct> writer =
+        new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc",
+            Reporter.NULL);
+    for(int r=0; r < 10; ++r) {
+      row.setFieldValue(8, new IntWritable(r * 10));
+      writer.write(nada, row);
+    }
+    union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
+    for(int r=0; r < 10; ++r) {
+      row.setFieldValue(8, new IntWritable(r * 10 + 100));
+      writer.write(nada, row);
+    }
+    OrcStruct row2 = new OrcStruct(type);
+    writer.write(nada, row2);
+    row.setFieldValue(8, new IntWritable(210));
+    writer.write(nada, row);
+    writer.close(Reporter.NULL);
+
+    FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000,
+        new String[0]);
+    RecordReader<NullWritable, OrcStruct> reader =
+        new OrcInputFormat<OrcStruct>().getRecordReader(split, conf,
+            Reporter.NULL);
+    nada = reader.createKey();
+    row = reader.createValue();
+    for(int r=0; r < 22; ++r) {
+      assertEquals(true, reader.next(nada, row));
+      if (r == 20) {
+        for(int c=0; c < 12; ++c) {
+          assertEquals(null, row.getFieldValue(c));
+        }
+      } else {
+        assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0));
+        assertEquals(new BooleanWritable(true), row.getFieldValue(1));
+        assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2));
+        assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3));
+        assertEquals(new DateWritable(DateWritable.millisToDays
+            (format.parse("2016-04-01").getTime())), row.getFieldValue(4));
+        assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5));
+        assertEquals(new DoubleWritable(1.5), row.getFieldValue(6));
+        assertEquals(new FloatWritable(4.5f), row.getFieldValue(7));
+        assertEquals(new IntWritable(r * 10), row.getFieldValue(8));
+        assertEquals(longList, row.getFieldValue(9));
+        assertEquals(map, row.getFieldValue(10));
+        if (r < 10) {
+          union.set((byte) 1, new Text("abcde"));
+        } else {
+          union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
+        }
+        assertEquals("row " + r, struct, row.getFieldValue(11));
+        assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"),
+            row.getFieldValue(12));
+      }
+    }
+    assertEquals(false, reader.next(nada, row));
+  }
+
+  /**
+   * Test the case where the top level isn't a struct, but a long.
+   */
+  @Test
+  public void testLongRoot() throws Exception {
+    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+    conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024);
+    conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11");
+    final String typeStr = "bigint";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    FileOutputFormat.setOutputPath(conf, workDir);
+    TypeDescription type = TypeDescription.fromString(typeStr);
+    LongWritable value = new LongWritable();
+    NullWritable nada = NullWritable.get();
+    RecordWriter<NullWritable, LongWritable> writer =
+        new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf,
+            "long.orc", Reporter.NULL);
+    for(long lo=0; lo < 2000; ++lo) {
+      value.set(lo);
+      writer.write(nada, value);
+    }
+    writer.close(Reporter.NULL);
+
+    Path path = new Path(workDir, "long.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(CompressionKind.SNAPPY, file.getCompressionKind());
+    assertEquals(2000, file.getNumberOfRows());
+    assertEquals(1000, file.getRowIndexStride());
+    assertEquals(64 * 1024, file.getCompressionSize());
+    assertEquals(OrcFile.Version.V_0_11, file.getFileVersion());
+    FileSplit split = new FileSplit(path, 0, 100000,
+        new String[0]);
+    RecordReader<NullWritable, LongWritable> reader =
+        new OrcInputFormat<LongWritable>().getRecordReader(split, conf,
+            Reporter.NULL);
+    nada = reader.createKey();
+    value = reader.createValue();
+    for(long lo=0; lo < 2000; ++lo) {
+      assertEquals(true, reader.next(nada, value));
+      assertEquals(lo, value.get());
+    }
+    assertEquals(false, reader.next(nada, value));
+  }
+
+  /**
+   * Make sure that the writer ignores the OrcKey
+   * @throws Exception
+   */
+  @Test
+  public void testOrcKey() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0");
+    String TYPE_STRING = "struct<i:int,s:string>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcKey key = new OrcKey(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(fs, conf, "key.orc",
+            Reporter.NULL);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 2000; ++r) {
+      ((OrcStruct) key.key).setAllFields(new IntWritable(r),
+          new Text(Integer.toString(r)));
+      writer.write(nada, key);
+    }
+    writer.close(Reporter.NULL);
+    Path path = new Path(workDir, "key.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(2000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+
+  /**
+   * Make sure that the writer ignores the OrcValue
+   * @throws Exception
+   */
+  @Test
+  public void testOrcValue() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0");
+    String TYPE_STRING = "struct<i:int>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setOutputCommitter(NullOutputCommitter.class);
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcValue value = new OrcValue(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(fs, conf, "value.orc",
+            Reporter.NULL);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 3000; ++r) {
+      ((OrcStruct) value.value).setAllFields(new IntWritable(r));
+      writer.write(nada, value);
+    }
+    writer.close(Reporter.NULL);
+    Path path = new Path(workDir, "value.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(3000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
index d32ce94..82699ed 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
@@ -76,6 +76,11 @@ public class TestOrcStruct {
     assertEquals(new IntWritable(42), struct.getFieldValue("i"));
     assertEquals(new DoubleWritable(1.5), struct.getFieldValue(1));
     assertEquals(new Text("Moria"), struct.getFieldValue("k"));
+    struct.setAllFields(new IntWritable(123), new DoubleWritable(4.5),
+        new Text("ok"));
+    assertEquals("123", struct.getFieldValue(0).toString());
+    assertEquals("4.5", struct.getFieldValue(1).toString());
+    assertEquals("ok", struct.getFieldValue(2).toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
deleted file mode 100644
index ce5523f..0000000
--- a/java/mapreduce/src/test/org/apache/orc/mapred/other/TestOrcOutputFormat.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.mapred.other;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.mapred.OrcInputFormat;
-import org.apache.orc.mapred.OrcList;
-import org.apache.orc.mapred.OrcMap;
-import org.apache.orc.mapred.OrcOutputFormat;
-import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapred.OrcTimestamp;
-import org.apache.orc.mapred.OrcUnion;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestOrcOutputFormat {
-
-  Path workDir = new Path(System.getProperty("test.tmp.dir",
-      "target" + File.separator + "test" + File.separator + "tmp"));
-  JobConf conf = new JobConf();
-  FileSystem fs;
-
-  {
-    try {
-      fs =  FileSystem.getLocal(conf).getRaw();
-      fs.delete(workDir, true);
-      fs.mkdirs(workDir);
-    } catch (IOException e) {
-      throw new IllegalStateException("bad fs init", e);
-    }
-  }
-
-  static class NullOutputCommitter extends OutputCommitter {
-
-    @Override
-    public void setupJob(JobContext jobContext) {
-      // PASS
-    }
-
-    @Override
-    public void setupTask(TaskAttemptContext taskAttemptContext) {
-
-    }
-
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
-      return false;
-    }
-
-    @Override
-    public void commitTask(TaskAttemptContext taskAttemptContext) {
-      // PASS
-    }
-
-    @Override
-    public void abortTask(TaskAttemptContext taskAttemptContext) {
-      // PASS
-    }
-  }
-
-  @Test
-  public void testAllTypes() throws Exception {
-    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
-    conf.setOutputCommitter(NullOutputCommitter.class);
-    final String typeStr = "struct<b1:binary,b2:boolean,b3:tinyint," +
-        "c:char(10),d1:date,d2:decimal(20,5),d3:double,fff:float,int:int," +
-        "l:array<bigint>,map:map<smallint,string>," +
-        "str:struct<u:uniontype<timestamp,varchar(100)>>,ts:timestamp>";
-    conf.set(OrcConf.SCHEMA.getAttribute(), typeStr);
-    FileOutputFormat.setOutputPath(conf, workDir);
-    TypeDescription type = TypeDescription.fromString(typeStr);
-
-    // build a row object
-    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
-    ((BytesWritable) row.getFieldValue(0)).set(new byte[]{1,2,3,4}, 0, 4);
-    ((BooleanWritable) row.getFieldValue(1)).set(true);
-    ((ByteWritable) row.getFieldValue(2)).set((byte) 23);
-    ((Text) row.getFieldValue(3)).set("aaabbbcccddd");
-    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-    ((DateWritable) row.getFieldValue(4)).set(DateWritable.millisToDays
-        (format.parse("2016-04-01").getTime()));
-    ((HiveDecimalWritable) row.getFieldValue(5)).set(new HiveDecimalWritable("1.23"));
-    ((DoubleWritable) row.getFieldValue(6)).set(1.5);
-    ((FloatWritable) row.getFieldValue(7)).set(4.5f);
-    ((IntWritable) row.getFieldValue(8)).set(31415);
-    OrcList<LongWritable> longList = (OrcList<LongWritable>) row.getFieldValue(9);
-    longList.add(new LongWritable(123));
-    longList.add(new LongWritable(456));
-    OrcMap<ShortWritable,Text> map = (OrcMap<ShortWritable,Text>) row.getFieldValue(10);
-    map.put(new ShortWritable((short) 1000), new Text("aaaa"));
-    map.put(new ShortWritable((short) 123), new Text("bbbb"));
-    OrcStruct struct = (OrcStruct) row.getFieldValue(11);
-    OrcUnion union = (OrcUnion) struct.getFieldValue(0);
-    union.set((byte) 1, new Text("abcde"));
-    ((OrcTimestamp) row.getFieldValue(12)).set("1996-12-11 15:00:00");
-    NullWritable nada = NullWritable.get();
-    RecordWriter<NullWritable, OrcStruct> writer =
-        new OrcOutputFormat<OrcStruct>().getRecordWriter(fs, conf, "all.orc",
-            Reporter.NULL);
-    for(int r=0; r < 10; ++r) {
-      row.setFieldValue(8, new IntWritable(r * 10));
-      writer.write(nada, row);
-    }
-    union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
-    for(int r=0; r < 10; ++r) {
-      row.setFieldValue(8, new IntWritable(r * 10 + 100));
-      writer.write(nada, row);
-    }
-    OrcStruct row2 = new OrcStruct(type);
-    writer.write(nada, row2);
-    row.setFieldValue(8, new IntWritable(210));
-    writer.write(nada, row);
-    writer.close(Reporter.NULL);
-
-    FileSplit split = new FileSplit(new Path(workDir, "all.orc"), 0, 100000,
-        new String[0]);
-    RecordReader<NullWritable, OrcStruct> reader =
-        new OrcInputFormat<OrcStruct>().getRecordReader(split, conf,
-            Reporter.NULL);
-    nada = reader.createKey();
-    row = reader.createValue();
-    for(int r=0; r < 22; ++r) {
-      assertEquals(true, reader.next(nada, row));
-      if (r == 20) {
-        for(int c=0; c < 12; ++c) {
-          assertEquals(null, row.getFieldValue(c));
-        }
-      } else {
-        assertEquals(new BytesWritable(new byte[]{1, 2, 3, 4}), row.getFieldValue(0));
-        assertEquals(new BooleanWritable(true), row.getFieldValue(1));
-        assertEquals(new ByteWritable((byte) 23), row.getFieldValue(2));
-        assertEquals(new Text("aaabbbcccd"), row.getFieldValue(3));
-        assertEquals(new DateWritable(DateWritable.millisToDays
-            (format.parse("2016-04-01").getTime())), row.getFieldValue(4));
-        assertEquals(new HiveDecimalWritable("1.23"), row.getFieldValue(5));
-        assertEquals(new DoubleWritable(1.5), row.getFieldValue(6));
-        assertEquals(new FloatWritable(4.5f), row.getFieldValue(7));
-        assertEquals(new IntWritable(r * 10), row.getFieldValue(8));
-        assertEquals(longList, row.getFieldValue(9));
-        assertEquals(map, row.getFieldValue(10));
-        if (r < 10) {
-          union.set((byte) 1, new Text("abcde"));
-        } else {
-          union.set((byte) 0, new OrcTimestamp("2011-12-25 12:34:56"));
-        }
-        assertEquals("row " + r, struct, row.getFieldValue(11));
-        assertEquals("row " + r, new OrcTimestamp("1996-12-11 15:00:00"),
-            row.getFieldValue(12));
-      }
-    }
-    assertEquals(false, reader.next(nada, row));
-  }
-
-  /**
-   * Test the case where the top level isn't a struct, but a long.
-   */
-  @Test
-  public void testLongRoot() throws Exception {
-    conf.set("mapreduce.task.attempt.id", "attempt_20160101_0001_m_000001_0");
-    conf.setOutputCommitter(NullOutputCommitter.class);
-    conf.set(OrcConf.COMPRESS.getAttribute(), "SNAPPY");
-    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
-    conf.setInt(OrcConf.BUFFER_SIZE.getAttribute(), 64 * 1024);
-    conf.set(OrcConf.WRITE_FORMAT.getAttribute(), "0.11");
-    final String typeStr = "bigint";
-    conf.set(OrcConf.SCHEMA.getAttribute(), typeStr);
-    FileOutputFormat.setOutputPath(conf, workDir);
-    TypeDescription type = TypeDescription.fromString(typeStr);
-    LongWritable value = new LongWritable();
-    NullWritable nada = NullWritable.get();
-    RecordWriter<NullWritable, LongWritable> writer =
-        new OrcOutputFormat<LongWritable>().getRecordWriter(fs, conf,
-            "long.orc", Reporter.NULL);
-    for(long lo=0; lo < 2000; ++lo) {
-      value.set(lo);
-      writer.write(nada, value);
-    }
-    writer.close(Reporter.NULL);
-
-    Path path = new Path(workDir, "long.orc");
-    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
-    assertEquals(CompressionKind.SNAPPY, file.getCompressionKind());
-    assertEquals(2000, file.getNumberOfRows());
-    assertEquals(1000, file.getRowIndexStride());
-    assertEquals(64 * 1024, file.getCompressionSize());
-    assertEquals(OrcFile.Version.V_0_11, file.getFileVersion());
-    FileSplit split = new FileSplit(path, 0, 100000,
-        new String[0]);
-    RecordReader<NullWritable, LongWritable> reader =
-        new OrcInputFormat<LongWritable>().getRecordReader(split, conf,
-            Reporter.NULL);
-    nada = reader.createKey();
-    value = reader.createValue();
-    for(long lo=0; lo < 2000; ++lo) {
-      assertEquals(true, reader.next(nada, value));
-      assertEquals(lo, value.get());
-    }
-    assertEquals(false, reader.next(nada, value));
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
new file mode 100644
index 0000000..27543c1
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestMapreduceOrcOutputFormat {
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir",
+      "target" + File.separator + "test" + File.separator + "tmp"));
+  JobConf conf = new JobConf();
+  FileSystem fs;
+
+  {
+    try {
+      fs =  FileSystem.getLocal(conf).getRaw();
+      fs.delete(workDir, true);
+      fs.mkdirs(workDir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+  }
+
+  @Test
+  public void testPredicatePushdown() throws Exception {
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    final String typeStr = "struct<i:int,s:string>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    OutputFormat<NullWritable, OrcStruct> outputFormat =
+        new OrcOutputFormat<OrcStruct>();
+    RecordWriter<NullWritable, OrcStruct> writer =
+        outputFormat.getRecordWriter(attemptContext);
+
+    // write 4000 rows with the integer and the binary string
+    TypeDescription type = TypeDescription.fromString(typeStr);
+    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 4000; ++r) {
+      row.setFieldValue(0, new IntWritable(r));
+      row.setFieldValue(1, new Text(Integer.toBinaryString(r)));
+      writer.write(nada, row);
+    }
+    writer.close(attemptContext);
+
+    OrcInputFormat.setSearchArgument(conf,
+        SearchArgumentFactory.newBuilder()
+            .between("i", PredicateLeaf.Type.LONG, new Long(1500), new Long(1999))
+            .build(), new String[]{null, "i", "s"});
+    FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"),
+        0, 1000000, new String[0]);
+    RecordReader<NullWritable, OrcStruct> reader =
+        new OrcInputFormat<OrcStruct>().createRecordReader(split,
+            attemptContext);
+    // the sarg should cause it to skip over the rows except 1000 to 2000
+    for(int r=1000; r < 2000; ++r) {
+      assertEquals(true, reader.nextKeyValue());
+      row = reader.getCurrentValue();
+      assertEquals(r, ((IntWritable) row.getFieldValue(0)).get());
+      assertEquals(Integer.toBinaryString(r), row.getFieldValue(1).toString());
+    }
+    assertEquals(false, reader.nextKeyValue());
+  }
+
+  @Test
+  public void testColumnSelection() throws Exception {
+    String typeStr = "struct<i:int,j:int,k:int>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), 1000);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    OutputFormat<NullWritable, OrcStruct> outputFormat =
+        new OrcOutputFormat<OrcStruct>();
+    RecordWriter<NullWritable, OrcStruct> writer =
+        outputFormat.getRecordWriter(attemptContext);
+
+    // write 4000 rows with the integer and the binary string
+    TypeDescription type = TypeDescription.fromString(typeStr);
+    OrcStruct row = (OrcStruct) OrcStruct.createValue(type);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 3000; ++r) {
+      row.setFieldValue(0, new IntWritable(r));
+      row.setFieldValue(1, new IntWritable(r * 2));
+      row.setFieldValue(2, new IntWritable(r * 3));
+      writer.write(nada, row);
+    }
+    writer.close(attemptContext);
+
+    conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute(), "0,2");
+    FileSplit split = new FileSplit(new Path(workDir, "part-m-00000.orc"),
+        0, 1000000, new String[0]);
+    RecordReader<NullWritable, OrcStruct> reader =
+        new OrcInputFormat<OrcStruct>().createRecordReader(split,
+            attemptContext);
+    // the sarg should cause it to skip over the rows except 1000 to 2000
+    for(int r=0; r < 3000; ++r) {
+      assertEquals(true, reader.nextKeyValue());
+      row = reader.getCurrentValue();
+      assertEquals(r, ((IntWritable) row.getFieldValue(0)).get());
+      assertEquals(null, row.getFieldValue(1));
+      assertEquals(r * 3, ((IntWritable) row.getFieldValue(2)).get());
+    }
+    assertEquals(false, reader.nextKeyValue());
+  }
+
+
+  /**
+   * Make sure that the writer ignores the OrcKey
+   * @throws Exception
+   */
+  @Test
+  public void testOrcKey() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    String TYPE_STRING = "struct<i:int,s:string>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcKey key = new OrcKey(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(attemptContext);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 2000; ++r) {
+      ((OrcStruct) key.key).setAllFields(new IntWritable(r),
+          new Text(Integer.toString(r)));
+      writer.write(nada, key);
+    }
+    writer.close(attemptContext);
+    Path path = new Path(workDir, "part-m-00000.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(2000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+
+  /**
+   * Make sure that the writer ignores the OrcValue
+   * @throws Exception
+   */
+  @Test
+  public void testOrcValue() throws Exception {
+    conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString());
+    String TYPE_STRING = "struct<i:int>";
+    OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING);
+    conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true);
+    TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+    TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+
+    TypeDescription schema = TypeDescription.fromString(TYPE_STRING);
+    OrcValue value = new OrcValue(new OrcStruct(schema));
+    RecordWriter<NullWritable, Writable> writer =
+        new OrcOutputFormat<>().getRecordWriter(attemptContext);
+    NullWritable nada = NullWritable.get();
+    for(int r=0; r < 3000; ++r) {
+      ((OrcStruct) value.value).setAllFields(new IntWritable(r));
+      writer.write(nada, value);
+    }
+    writer.close(attemptContext);
+    Path path = new Path(workDir, "part-m-00000.orc");
+    Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(3000, file.getNumberOfRows());
+    assertEquals(TYPE_STRING, file.getSchema().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
new file mode 100644
index 0000000..01208e1
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMrUnit.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcValue;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class TestMrUnit {
+  JobConf conf = new JobConf();
+
+  /**
+   * Split the input struct into its two parts.
+   */
+  public static class MyMapper
+      extends Mapper<NullWritable, OrcStruct, OrcKey, OrcValue> {
+    private OrcKey keyWrapper = new OrcKey();
+    private OrcValue valueWrapper = new OrcValue();
+
+    @Override
+    protected void map(NullWritable key,
+                       OrcStruct value,
+                       Context context
+                       ) throws IOException, InterruptedException {
+      keyWrapper.key = value.getFieldValue(0);
+      valueWrapper.value = value.getFieldValue(1);
+      context.write(keyWrapper, valueWrapper);
+    }
+  }
+
+  /**
+   * Glue the key and values back together.
+   */
+  public static class MyReducer
+      extends Reducer<OrcKey, OrcValue, NullWritable, OrcStruct> {
+    private OrcStruct output = new OrcStruct(TypeDescription.fromString
+        ("struct<first:struct<x:int,y:int>,second:struct<z:string>>"));
+    private final NullWritable nada = NullWritable.get();
+
+    @Override
+    protected void reduce(OrcKey key,
+                          Iterable<OrcValue> values,
+                          Context context
+                          ) throws IOException, InterruptedException {
+      output.setFieldValue(0, key.key);
+      for(OrcValue value: values) {
+        output.setFieldValue(1, value.value);
+        context.write(nada, output);
+      }
+    }
+  }
+
+  /**
+   * This class is intended to support MRUnit's object copying for input and
+   * output objects.
+   *
+   * Real mapreduce contexts should NEVER use this class.
+   *
+   * The type string is serialized before each value.
+   */
+  public static class OrcStructSerialization
+      implements Serialization<OrcStruct> {
+
+    @Override
+    public boolean accept(Class<?> cls) {
+      return OrcStruct.class.isAssignableFrom(cls);
+    }
+
+    @Override
+    public Serializer<OrcStruct> getSerializer(Class<OrcStruct> aClass) {
+      return new Serializer<OrcStruct>() {
+        DataOutputStream dataOut;
+
+        public void open(OutputStream out) {
+          if(out instanceof DataOutputStream) {
+            dataOut = (DataOutputStream)out;
+          } else {
+            dataOut = new DataOutputStream(out);
+          }
+        }
+
+        public void serialize(OrcStruct w) throws IOException {
+          Text.writeString(dataOut, w.getSchema().toString());
+          w.write(dataOut);
+        }
+
+        public void close() throws IOException {
+          dataOut.close();
+        }
+      };
+    }
+
+    @Override
+    public Deserializer<OrcStruct> getDeserializer(Class<OrcStruct> aClass) {
+      return new Deserializer<OrcStruct>() {
+        DataInputStream input;
+
+        @Override
+        public void open(InputStream inputStream) throws IOException {
+          if(inputStream instanceof DataInputStream) {
+            input = (DataInputStream)inputStream;
+          } else {
+            input = new DataInputStream(inputStream);
+          }
+        }
+
+        @Override
+        public OrcStruct deserialize(OrcStruct orcStruct) throws IOException {
+          String typeStr = Text.readString(input);
+          OrcStruct result = new OrcStruct(TypeDescription.fromString(typeStr));
+          result.readFields(input);
+          return result;
+        }
+
+        @Override
+        public void close() throws IOException {
+          // PASS
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testMapred() throws IOException {
+    conf.set("io.serializations",
+        OrcStructSerialization.class.getName() + "," +
+            WritableSerialization.class.getName());
+    OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.setString(conf, "struct<x:int,y:int>");
+    OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.setString(conf, "struct<z:string>");
+    MyMapper mapper = new MyMapper();
+    MyReducer reducer = new MyReducer();
+    MapReduceDriver<NullWritable, OrcStruct,
+                    OrcKey, OrcValue,
+                    NullWritable, OrcStruct> driver =
+        new MapReduceDriver<>(mapper, reducer);
+    driver.setConfiguration(conf);
+    NullWritable nada = NullWritable.get();
+    OrcStruct input = (OrcStruct) OrcStruct.createValue(
+        TypeDescription.fromString("struct<one:struct<x:int,y:int>,two:struct<z:string>>"));
+    IntWritable x =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(0);
+    IntWritable y =
+        (IntWritable) ((OrcStruct) input.getFieldValue(0)).getFieldValue(1);
+    Text z = (Text) ((OrcStruct) input.getFieldValue(1)).getFieldValue(0);
+
+    // generate the input stream
+    for(int r=0; r < 20; ++r) {
+      x.set(100 -  (r / 4));
+      y.set(r*2);
+      z.set(Integer.toHexString(r));
+      driver.withInput(nada, input);
+    }
+
+    // generate the expected outputs
+    for(int g=4; g >= 0; --g) {
+      x.set(100 - g);
+      for(int i=0; i < 4; ++i) {
+        int r = g * 4 + i;
+        y.set(r * 2);
+        z.set(Integer.toHexString(r));
+        driver.withOutput(nada, input);
+      }
+    }
+    driver.runTest();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 9941dee..2eacd7a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -47,6 +47,7 @@
     <junit.version>4.11</junit.version>
     <kryo.version>3.0.3</kryo.version>
     <mockito.version>1.9.5</mockito.version>
+    <mrunit.version>1.1.0</mrunit.version>
     <protobuf.version>2.5.0</protobuf.version>
     <slf4j.version>1.7.5</slf4j.version>
     <snappy.version>0.2</snappy.version>


[2/2] orc git commit: ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and OutputFormat. (omalley)

Posted by om...@apache.org.
ORC-52 Add support for org.apache.hadoop.mapreduce InputFormat and
OutputFormat. (omalley)

Fixes #27

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/3bb5ce53
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/3bb5ce53
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/3bb5ce53

Branch: refs/heads/master
Commit: 3bb5ce532180fcaa03fa6d13d5829a18a153ad6e
Parents: 545fe37
Author: Owen O'Malley <om...@apache.org>
Authored: Tue May 24 14:28:36 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Jun 1 20:56:28 2016 -0700

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcConf.java  |  31 +-
 java/mapreduce/pom.xml                          |   7 +
 .../org/apache/orc/mapred/OrcInputFormat.java   |  62 ++-
 .../src/java/org/apache/orc/mapred/OrcKey.java  |  90 +++
 .../src/java/org/apache/orc/mapred/OrcMap.java  |   1 -
 .../orc/mapred/OrcMapredRecordReader.java       | 551 +++++++++++++++++++
 .../orc/mapred/OrcMapredRecordWriter.java       | 283 ++++++++++
 .../org/apache/orc/mapred/OrcOutputFormat.java  |  45 +-
 .../org/apache/orc/mapred/OrcRecordReader.java  | 547 ------------------
 .../org/apache/orc/mapred/OrcRecordWriter.java  | 277 ----------
 .../java/org/apache/orc/mapred/OrcStruct.java   |   8 +
 .../org/apache/orc/mapred/OrcTimestamp.java     |   2 +-
 .../java/org/apache/orc/mapred/OrcValue.java    |  69 +++
 .../apache/orc/mapreduce/OrcInputFormat.java    |  71 +++
 .../orc/mapreduce/OrcMapreduceRecordReader.java | 119 ++++
 .../orc/mapreduce/OrcMapreduceRecordWriter.java |  83 +++
 .../apache/orc/mapreduce/OrcOutputFormat.java   |  70 +++
 .../test/org/apache/orc/mapred/TestMrUnit.java  | 223 ++++++++
 .../apache/orc/mapred/TestOrcOutputFormat.java  | 299 ++++++++++
 .../org/apache/orc/mapred/TestOrcStruct.java    |   5 +
 .../orc/mapred/other/TestOrcOutputFormat.java   | 249 ---------
 .../mapreduce/TestMapreduceOrcOutputFormat.java | 214 +++++++
 .../org/apache/orc/mapreduce/TestMrUnit.java    | 203 +++++++
 java/pom.xml                                    |   1 +
 24 files changed, 2393 insertions(+), 1117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/core/src/java/org/apache/orc/OrcConf.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java
index df1b410..a575e18 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -101,9 +101,18 @@ public enum OrcConf {
       "The maximum size of the file to read for finding the file tail. This\n" +
           "is primarily used for streaming ingest to read intermediate\n" +
           "footers while the file is still open"),
-  SCHEMA("orc.schema", "orc.schema", null,
-      "The schema that the user desires to read or write. The values are\n" +
+  MAPRED_INPUT_SCHEMA("orc.mapred.input.schema", null, null,
+      "The schema that the user desires to read. The values are\n" +
       "interpreted using TypeDescription.fromString."),
+  MAPRED_SHUFFLE_KEY_SCHEMA("orc.mapred.map.output.key.schema", null, null,
+      "The schema of the MapReduce shuffle key. The values are\n" +
+          "interpreted using TypeDescription.fromString."),
+  MAPRED_SHUFFLE_VALUE_SCHEMA("orc.mapred.map.output.value.schema", null, null,
+      "The schema of the MapReduce shuffle value. The values are\n" +
+          "interpreted using TypeDescription.fromString."),
+  MAPRED_OUTPUT_SCHEMA("orc.mapred.output.schema", null, null,
+      "The schema that the user desires to write. The values are\n" +
+          "interpreted using TypeDescription.fromString."),
   INCLUDE_COLUMNS("orc.include.columns", "hive.io.file.readcolumn.ids", null,
       "The list of comma separated column ids that should be read with 0\n" +
           "being the first column, 1 being the next, and so on. ."),
@@ -151,7 +160,7 @@ public enum OrcConf {
     }
     if (result == null && conf != null) {
       result = conf.get(attribute);
-      if (result == null) {
+      if (result == null && hiveConfName != null) {
         result = conf.get(hiveConfName);
       }
     }
@@ -170,6 +179,10 @@ public enum OrcConf {
     return getLong(null, conf);
   }
 
+  public void setLong(Configuration conf, long value) {
+    conf.setLong(attribute, value);
+  }
+
   public String getString(Properties tbl, Configuration conf) {
     String value = lookupValue(tbl, conf);
     return value == null ? (String) defaultValue : value;
@@ -179,6 +192,10 @@ public enum OrcConf {
     return getString(null, conf);
   }
 
+  public void setString(Configuration conf, String value) {
+    conf.set(attribute, value);
+  }
+
   public boolean getBoolean(Properties tbl, Configuration conf) {
     String value = lookupValue(tbl, conf);
     if (value != null) {
@@ -191,6 +208,10 @@ public enum OrcConf {
     return getBoolean(null, conf);
   }
 
+  public void setBoolean(Configuration conf, boolean value) {
+    conf.setBoolean(attribute, value);
+  }
+
   public double getDouble(Properties tbl, Configuration conf) {
     String value = lookupValue(tbl, conf);
     if (value != null) {
@@ -202,4 +223,8 @@ public enum OrcConf {
   public double getDouble(Configuration conf) {
     return getDouble(null, conf);
   }
+
+  public void setDouble(Configuration conf, double value) {
+    conf.setDouble(attribute, value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml
index 3b38a40..0b48c82 100644
--- a/java/mapreduce/pom.xml
+++ b/java/mapreduce/pom.xml
@@ -118,6 +118,13 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.mrunit</groupId>
+      <artifactId>mrunit</artifactId>
+      <version>${mrunit.version}</version>
+      <scope>test</scope>
+      <classifier>hadoop2</classifier>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <version>${mockito.version}</version>

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
index 78e75f7..ac8ca61 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
@@ -25,7 +25,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.RecordReader;
@@ -46,7 +46,7 @@ import org.apache.orc.TypeDescription;
 /**
  * A MapReduce/Hive input format for ORC files.
  */
-public class OrcInputFormat<V extends Writable>
+public class OrcInputFormat<V extends WritableComparable>
     extends FileInputFormat<NullWritable, V> {
 
   /**
@@ -56,7 +56,8 @@ public class OrcInputFormat<V extends Writable>
    * @param columnsStr the comma separated list of column ids
    * @return a boolean array
    */
-  static boolean[] parseInclude(TypeDescription schema, String columnsStr) {
+  public static boolean[] parseInclude(TypeDescription schema,
+                                       String columnsStr) {
     if (columnsStr == null ||
         schema.getCategory() != TypeDescription.Category.STRUCT) {
       return null;
@@ -82,39 +83,41 @@ public class OrcInputFormat<V extends Writable>
   public static void setSearchArgument(Configuration conf,
                                        SearchArgument sarg,
                                        String[] columnNames) {
-    Output out = new Output();
+    Output out = new Output(100000);
     new Kryo().writeObject(out, sarg);
-    conf.set(OrcConf.KRYO_SARG.getAttribute(),
-        Base64.encodeBase64String(out.toBytes()));
+    OrcConf.KRYO_SARG.setString(conf, Base64.encodeBase64String(out.toBytes()));
     StringBuilder buffer = new StringBuilder();
-    for(int i=0; i < columnNames.length; ++i) {
+    for (int i = 0; i < columnNames.length; ++i) {
       if (i != 0) {
         buffer.append(',');
       }
       buffer.append(columnNames[i]);
     }
-    conf.set(OrcConf.SARG_COLUMNS.getAttribute(), buffer.toString());
+    OrcConf.SARG_COLUMNS.setString(conf, buffer.toString());
   }
 
-  @Override
-  public RecordReader<NullWritable, V>
-  getRecordReader(InputSplit inputSplit,
-                  JobConf conf,
-                  Reporter reporter) throws IOException {
-    FileSplit split = (FileSplit) inputSplit;
-    Reader file = OrcFile.createReader(split.getPath(),
-        OrcFile.readerOptions(conf)
-            .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+  /**
+   * Build the Reader.Options object based on the JobConf and the range of
+   * bytes.
+   * @param conf the job configuratoin
+   * @param start the byte offset to start reader
+   * @param length the number of bytes to read
+   * @return the options to read with
+   */
+  public static Reader.Options buildOptions(Configuration conf,
+                                            Reader reader,
+                                            long start,
+                                            long length) {
     TypeDescription schema =
-        TypeDescription.fromString(OrcConf.SCHEMA.getString(conf));
+        TypeDescription.fromString(OrcConf.MAPRED_INPUT_SCHEMA.getString(conf));
     Reader.Options options = new Reader.Options()
-        .range(split.getStart(), split.getLength())
+        .range(start, length)
         .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
         .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
-    if (schema == null) {
-      schema = file.getSchema();
-    } else {
+    if (schema != null) {
       options.schema(schema);
+    } else {
+      schema = reader.getSchema();
     }
     options.include(parseInclude(schema,
         OrcConf.INCLUDE_COLUMNS.getString(conf)));
@@ -126,6 +129,19 @@ public class OrcInputFormat<V extends Writable>
           new Kryo().readObject(new Input(sargBytes), SearchArgumentImpl.class);
       options.searchArgument(sarg, sargColumns.split(","));
     }
-    return new OrcRecordReader(file, options);
+    return options;
+  }
+
+  @Override
+  public RecordReader<NullWritable, V>
+  getRecordReader(InputSplit inputSplit,
+                  JobConf conf,
+                  Reporter reporter) throws IOException {
+    FileSplit split = (FileSplit) inputSplit;
+    Reader file = OrcFile.createReader(split.getPath(),
+        OrcFile.readerOptions(conf)
+            .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+    return new OrcMapredRecordReader<>(file, buildOptions(conf,
+        file, split.getStart(), split.getLength()));
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java
new file mode 100644
index 0000000..ad94e32
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcKey.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This type provides a wrapper for OrcStruct so that it can be sent through
+ * the MapReduce shuffle as a key.
+ *
+ * The user should set the JobConf with orc.mapred.key.type with the type
+ * string of the type.
+ */
+public final class OrcKey
+    implements WritableComparable<OrcKey>, JobConfigurable {
+
+  public WritableComparable key;
+
+  public OrcKey(WritableComparable key) {
+    this.key = key;
+  }
+
+  public OrcKey() {
+    key = null;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    key.write(dataOutput);
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    key.readFields(dataInput);
+  }
+
+  @Override
+  public void configure(JobConf conf) {
+    if (key == null) {
+      TypeDescription schema =
+          TypeDescription.fromString(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA
+              .getString(conf));
+      key = OrcStruct.createValue(schema);
+    }
+  }
+
+  @Override
+  public int compareTo(OrcKey o) {
+    return key.compareTo(o.key);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || key == null) {
+      return false;
+    } else if (o.getClass() != getClass()) {
+      return false;
+    } else {
+      return key.equals(((OrcKey) o).key);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return key.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
index cf47827..38c152c 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
@@ -45,7 +45,6 @@ public final class OrcMap<K extends WritableComparable,
 
   @Override
   public void write(DataOutput output) throws IOException {
-    Iterator<Map.Entry<K,V>> itr = entrySet().iterator();
     output.writeInt(size());
     for(Map.Entry<K,V> entry: entrySet()) {
       K key = entry.getKey();

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
new file mode 100644
index 0000000..ddbc396
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+/**
+ * This record reader implements the org.apache.hadoop.mapred API.
+ * @param <V> the root type of the file
+ */
+public class OrcMapredRecordReader<V extends WritableComparable>
+    implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> {
+  private final TypeDescription schema;
+  private final RecordReader batchReader;
+  private final VectorizedRowBatch batch;
+  private int rowInBatch;
+
+  protected OrcMapredRecordReader(Reader fileReader,
+                                  Reader.Options options) throws IOException {
+    this.batchReader = fileReader.rows(options);
+    if (options.getSchema() == null) {
+      schema = fileReader.getSchema();
+    } else {
+      schema = options.getSchema();
+    }
+    this.batch = schema.createRowBatch();
+    rowInBatch = 0;
+  }
+
+  /**
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   * @throws IOException
+   */
+  boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      rowInBatch = 0;
+      return batchReader.nextBatch(batch);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next(NullWritable key, V value) throws IOException {
+    if (!ensureBatch()) {
+      return false;
+    }
+    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+      OrcStruct result = (OrcStruct) value;
+      List<TypeDescription> children = schema.getChildren();
+      int numberOfChildren = children.size();
+      for(int i=0; i < numberOfChildren; ++i) {
+        result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
+            children.get(i), result.getFieldValue(i)));
+      }
+    } else {
+      nextValue(batch.cols[0], rowInBatch, schema, value);
+    }
+    rowInBatch += 1;
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V createValue() {
+    return (V) OrcStruct.createValue(schema);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    batchReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  static BooleanWritable nextBoolean(ColumnVector vector,
+                                     int row,
+                                     Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BooleanWritable result;
+      if (previous == null || previous.getClass() != BooleanWritable.class) {
+        result = new BooleanWritable();
+      } else {
+        result = (BooleanWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row] != 0);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static ByteWritable nextByte(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ByteWritable result;
+      if (previous == null || previous.getClass() != ByteWritable.class) {
+        result = new ByteWritable();
+      } else {
+        result = (ByteWritable) previous;
+      }
+      result.set((byte) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static ShortWritable nextShort(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ShortWritable result;
+      if (previous == null || previous.getClass() != ShortWritable.class) {
+        result = new ShortWritable();
+      } else {
+        result = (ShortWritable) previous;
+      }
+      result.set((short) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static IntWritable nextInt(ColumnVector vector,
+                             int row,
+                             Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      IntWritable result;
+      if (previous == null || previous.getClass() != IntWritable.class) {
+        result = new IntWritable();
+      } else {
+        result = (IntWritable) previous;
+      }
+      result.set((int) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static LongWritable nextLong(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      LongWritable result;
+      if (previous == null || previous.getClass() != LongWritable.class) {
+        result = new LongWritable();
+      } else {
+        result = (LongWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static FloatWritable nextFloat(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      FloatWritable result;
+      if (previous == null || previous.getClass() != FloatWritable.class) {
+        result = new FloatWritable();
+      } else {
+        result = (FloatWritable) previous;
+      }
+      result.set((float) ((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static DoubleWritable nextDouble(ColumnVector vector,
+                                   int row,
+                                   Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      DoubleWritable result;
+      if (previous == null || previous.getClass() != DoubleWritable.class) {
+        result = new DoubleWritable();
+      } else {
+        result = (DoubleWritable) previous;
+      }
+      result.set(((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static Text nextString(ColumnVector vector,
+                         int row,
+                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      Text result;
+      if (previous == null || previous.getClass() != Text.class) {
+        result = new Text();
+      } else {
+        result = (Text) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static BytesWritable nextBinary(ColumnVector vector,
+                                  int row,
+                                  Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BytesWritable result;
+      if (previous == null || previous.getClass() != BytesWritable.class) {
+        result = new BytesWritable();
+      } else {
+        result = (BytesWritable) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static HiveDecimalWritable nextDecimal(ColumnVector vector,
+                                         int row,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveDecimalWritable result;
+      if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
+        result = new HiveDecimalWritable();
+      } else {
+        result = (HiveDecimalWritable) previous;
+      }
+      result.set(((DecimalColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static DateWritable nextDate(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      DateWritable result;
+      if (previous == null || previous.getClass() != DateWritable.class) {
+        result = new DateWritable();
+      } else {
+        result = (DateWritable) previous;
+      }
+      int date = (int) ((LongColumnVector) vector).vector[row];
+      result.set(date);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcTimestamp nextTimestamp(ColumnVector vector,
+                                    int row,
+                                    Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcTimestamp result;
+      if (previous == null || previous.getClass() != OrcTimestamp.class) {
+        result = new OrcTimestamp();
+      } else {
+        result = (OrcTimestamp) previous;
+      }
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      result.setTime(tcv.time[row]);
+      result.setNanos(tcv.nanos[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcStruct nextStruct(ColumnVector vector,
+                              int row,
+                              TypeDescription schema,
+                              Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcStruct result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      int numChildren = childrenTypes.size();
+      if (previous == null || previous.getClass() != OrcStruct.class) {
+        result = new OrcStruct(schema);
+      } else {
+        result = (OrcStruct) previous;
+      }
+      StructColumnVector struct = (StructColumnVector) vector;
+      for(int f=0; f < numChildren; ++f) {
+        result.setFieldValue(f, nextValue(struct.fields[f], row,
+            childrenTypes.get(f), result.getFieldValue(f)));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcUnion nextUnion(ColumnVector vector,
+                            int row,
+                            TypeDescription schema,
+                            Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcUnion result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      if (previous == null || previous.getClass() != OrcUnion.class) {
+        result = new OrcUnion(schema);
+      } else {
+        result = (OrcUnion) previous;
+      }
+      UnionColumnVector union = (UnionColumnVector) vector;
+      byte tag = (byte) union.tags[row];
+      result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
+          result.getObject()));
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcList nextList(ColumnVector vector,
+                          int row,
+                          TypeDescription schema,
+                          Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcList result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      TypeDescription valueType = childrenTypes.get(0);
+      if (previous == null ||
+          previous.getClass() != ArrayList.class) {
+        result = new OrcList(schema);
+      } else {
+        result = (OrcList) previous;
+      }
+      ListColumnVector list = (ListColumnVector) vector;
+      int length = (int) list.lengths[row];
+      int offset = (int) list.offsets[row];
+      result.ensureCapacity(length);
+      int oldLength = result.size();
+      int idx = 0;
+      while (idx < length && idx < oldLength) {
+        result.set(idx, nextValue(list.child, offset + idx, valueType,
+            result.get(idx)));
+        idx += 1;
+      }
+      if (length < oldLength) {
+        for(int i= oldLength - 1; i >= length; --i) {
+          result.remove(i);
+        }
+      } else if (oldLength < length) {
+        while (idx < length) {
+          result.add(nextValue(list.child, offset + idx, valueType, null));
+          idx += 1;
+        }
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcMap nextMap(ColumnVector vector,
+                        int row,
+                        TypeDescription schema,
+                        Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      MapColumnVector map = (MapColumnVector) vector;
+      int length = (int) map.lengths[row];
+      int offset = (int) map.offsets[row];
+      OrcMap result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      TypeDescription keyType = childrenTypes.get(0);
+      TypeDescription valueType = childrenTypes.get(1);
+      if (previous == null ||
+          previous.getClass() != OrcMap.class) {
+        result = new OrcMap(schema);
+      } else {
+        result = (OrcMap) previous;
+        // I couldn't think of a good way to reuse the keys and value objects
+        // without even more allocations, so take the easy and safe approach.
+        result.clear();
+      }
+      for(int e=0; e < length; ++e) {
+        result.put(nextValue(map.keys, e + offset, keyType, null),
+                   nextValue(map.values, e + offset, valueType, null));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  public static WritableComparable nextValue(ColumnVector vector,
+                                             int row,
+                                             TypeDescription schema,
+                                             Object previous) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return nextBoolean(vector, row, previous);
+      case BYTE:
+        return nextByte(vector, row, previous);
+      case SHORT:
+        return nextShort(vector, row, previous);
+      case INT:
+        return nextInt(vector, row, previous);
+      case LONG:
+        return nextLong(vector, row, previous);
+      case FLOAT:
+        return nextFloat(vector, row, previous);
+      case DOUBLE:
+        return nextDouble(vector, row, previous);
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return nextString(vector, row, previous);
+      case BINARY:
+        return nextBinary(vector, row, previous);
+      case DECIMAL:
+        return nextDecimal(vector, row, previous);
+      case DATE:
+        return nextDate(vector, row, previous);
+      case TIMESTAMP:
+        return nextTimestamp(vector, row, previous);
+      case STRUCT:
+        return nextStruct(vector, row, schema, previous);
+      case UNION:
+        return nextUnion(vector, row, schema, previous);
+      case LIST:
+        return nextList(vector, row, schema, previous);
+      case MAP:
+        return nextMap(vector, row, schema, previous);
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
new file mode 100644
index 0000000..59f89f7
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class OrcMapredRecordWriter<V extends Writable>
+    implements RecordWriter<NullWritable, V> {
+  private final Writer writer;
+  private final VectorizedRowBatch batch;
+  private final TypeDescription schema;
+  private final boolean isTopStruct;
+
+  public OrcMapredRecordWriter(Writer writer) {
+    this.writer = writer;
+    schema = writer.getSchema();
+    this.batch = schema.createRowBatch();
+    isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
+  }
+
+  static void setLongValue(ColumnVector vector, int row, long value) {
+    ((LongColumnVector) vector).vector[row] = value;
+  }
+
+  static void setDoubleValue(ColumnVector vector, int row, double value) {
+    ((DoubleColumnVector) vector).vector[row] = value;
+  }
+
+  static void setBinaryValue(ColumnVector vector, int row,
+                             BinaryComparable value) {
+    ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
+        value.getLength());
+  }
+
+  static void setBinaryValue(ColumnVector vector, int row,
+                             BinaryComparable value, int maxLength) {
+    ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
+        Math.min(maxLength, value.getLength()));
+  }
+
+  private static final ThreadLocal<byte[]> SPACE_BUFFER =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          byte[] result = new byte[100];
+          Arrays.fill(result, (byte) ' ');
+          return result;
+        }
+      };
+
+  static void setCharValue(BytesColumnVector vector,
+                           int row,
+                           Text value,
+                           int length) {
+    // we need to trim or pad the string with spaces to required length
+    int actualLength = value.getLength();
+    if (actualLength >= length) {
+      setBinaryValue(vector, row, value, length);
+    } else {
+      byte[] spaces = SPACE_BUFFER.get();
+      if (length - actualLength > spaces.length) {
+        spaces = new byte[length - actualLength];
+        Arrays.fill(spaces, (byte)' ');
+        SPACE_BUFFER.set(spaces);
+      }
+      vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0,
+          length - actualLength);
+    }
+  }
+
+  static void setStructValue(TypeDescription schema,
+                             StructColumnVector vector,
+                             int row,
+                             OrcStruct value) {
+    List<TypeDescription> children = schema.getChildren();
+    for(int c=0; c < value.getNumFields(); ++c) {
+      setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c));
+    }
+  }
+
+  static void setUnionValue(TypeDescription schema,
+                            UnionColumnVector vector,
+                            int row,
+                            OrcUnion value) {
+    List<TypeDescription> children = schema.getChildren();
+    int tag = value.getTag() & 0xff;
+    vector.tags[row] = tag;
+    setColumn(children.get(tag), vector.fields[tag], row, value.getObject());
+  }
+
+
+  static void setListValue(TypeDescription schema,
+                           ListColumnVector vector,
+                           int row,
+                           OrcList value) {
+    TypeDescription elemType = schema.getChildren().get(0);
+    vector.offsets[row] = vector.childCount;
+    vector.lengths[row] = value.size();
+    vector.childCount += vector.lengths[row];
+    vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0);
+    for(int e=0; e < vector.lengths[row]; ++e) {
+      setColumn(elemType, vector.child, (int) vector.offsets[row] + e,
+          (Writable) value.get(e));
+    }
+  }
+
+  static void setMapValue(TypeDescription schema,
+                          MapColumnVector vector,
+                          int row,
+                          OrcMap<?,?> value) {
+    TypeDescription keyType = schema.getChildren().get(0);
+    TypeDescription valueType = schema.getChildren().get(1);
+    vector.offsets[row] = vector.childCount;
+    vector.lengths[row] = value.size();
+    vector.childCount += vector.lengths[row];
+    vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0);
+    vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0);
+    int e = 0;
+    for(Map.Entry<?,?> entry: value.entrySet()) {
+      setColumn(keyType, vector.keys, (int) vector.offsets[row] + e,
+          (Writable) entry.getKey());
+      setColumn(valueType, vector.values, (int) vector.offsets[row] + e,
+          (Writable) entry.getValue());
+      e += 1;
+    }
+  }
+
+  public static void setColumn(TypeDescription schema,
+                               ColumnVector vector,
+                               int row,
+                               Writable value) {
+    if (value == null) {
+      vector.noNulls = false;
+      vector.isNull[row] = true;
+    } else {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0);
+          break;
+        case BYTE:
+          setLongValue(vector, row, ((ByteWritable) value).get());
+          break;
+        case SHORT:
+          setLongValue(vector, row, ((ShortWritable) value).get());
+          break;
+        case INT:
+          setLongValue(vector, row, ((IntWritable) value).get());
+          break;
+        case LONG:
+          setLongValue(vector, row, ((LongWritable) value).get());
+          break;
+        case FLOAT:
+          setDoubleValue(vector, row, ((FloatWritable) value).get());
+          break;
+        case DOUBLE:
+          setDoubleValue(vector, row, ((DoubleWritable) value).get());
+          break;
+        case STRING:
+          setBinaryValue(vector, row, (Text) value);
+          break;
+        case CHAR:
+          setCharValue((BytesColumnVector) vector, row, (Text) value,
+              schema.getMaxLength());
+          break;
+        case VARCHAR:
+          setBinaryValue(vector, row, (Text) value, schema.getMaxLength());
+          break;
+        case BINARY:
+          setBinaryValue(vector, row, (BytesWritable) value);
+          break;
+        case DATE:
+          setLongValue(vector, row, ((DateWritable) value).getDays());
+          break;
+        case TIMESTAMP:
+          ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value);
+          break;
+        case DECIMAL:
+          ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value);
+          break;
+        case STRUCT:
+          setStructValue(schema, (StructColumnVector) vector, row,
+              (OrcStruct) value);
+          break;
+        case UNION:
+          setUnionValue(schema, (UnionColumnVector) vector, row,
+              (OrcUnion) value);
+          break;
+        case LIST:
+          setListValue(schema, (ListColumnVector) vector, row, (OrcList) value);
+          break;
+        case MAP:
+          setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown type " + schema);
+      }
+    }
+  }
+
+  @Override
+  public void write(NullWritable nullWritable, V v) throws IOException {
+    // if the batch is full, write it out.
+    if (batch.size == batch.getMaxSize()) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+
+    // add the new row
+    int row = batch.size++;
+    // skip over the OrcKey or OrcValue
+    if (v instanceof OrcKey) {
+      v = (V)((OrcKey) v).key;
+    } else if (v instanceof OrcValue) {
+      v = (V)((OrcValue) v).value;
+    }
+    if (isTopStruct) {
+      for(int f=0; f < schema.getChildren().size(); ++f) {
+        setColumn(schema.getChildren().get(f), batch.cols[f], row,
+            ((OrcStruct) v).getFieldValue(f));
+      }
+    } else {
+      setColumn(schema, batch.cols[0], row, v);
+    }
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
index 6186c83..341fbcd 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc.mapred;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
@@ -34,9 +35,35 @@ import org.apache.orc.Writer;
 
 import java.io.IOException;
 
+/**
+ * An ORC output format that satisfies the org.apache.hadoop.mapred API.
+ */
 public class OrcOutputFormat<V extends Writable>
     extends FileOutputFormat<NullWritable, V> {
 
+  /**
+   * This function builds the options for the ORC Writer based on the JobConf.
+   * @param conf the job configuration
+   * @return a new options object
+   */
+  public static OrcFile.WriterOptions buildOptions(Configuration conf) {
+    return OrcFile.writerOptions(conf)
+        .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf)))
+        .setSchema(TypeDescription.fromString(OrcConf.MAPRED_OUTPUT_SCHEMA
+            .getString(conf)))
+        .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf)))
+        .encodingStrategy(OrcFile.EncodingStrategy.valueOf
+            (OrcConf.ENCODING_STRATEGY.getString(conf)))
+        .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf))
+        .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf))
+        .blockSize(OrcConf.BLOCK_SIZE.getLong(conf))
+        .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf))
+        .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf))
+        .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf))
+        .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf))
+        .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf));
+  }
+
   @Override
   public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem,
                                                        JobConf conf,
@@ -45,21 +72,7 @@ public class OrcOutputFormat<V extends Writable>
                                                        ) throws IOException {
     Path path = getTaskOutputPath(conf, name);
     Writer writer = OrcFile.createWriter(path,
-        OrcFile.writerOptions(conf)
-            .fileSystem(fileSystem)
-            .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf)))
-            .setSchema(TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)))
-            .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf)))
-            .encodingStrategy(OrcFile.EncodingStrategy.valueOf
-                (OrcConf.ENCODING_STRATEGY.getString(conf)))
-            .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf))
-            .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf))
-            .blockSize(OrcConf.BLOCK_SIZE.getLong(conf))
-            .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf))
-            .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf))
-            .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf))
-            .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf))
-            .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf)));
-    return new OrcRecordWriter<>(writer);
+        buildOptions(conf).fileSystem(fileSystem));
+    return new OrcMapredRecordWriter<>(writer);
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
deleted file mode 100644
index f6bf635..0000000
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.mapred;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-public class OrcRecordReader<V extends WritableComparable>
-    implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> {
-  private final TypeDescription schema;
-  private final RecordReader batchReader;
-  private final VectorizedRowBatch batch;
-  private int rowInBatch;
-
-  protected OrcRecordReader(Reader fileReader,
-                            Reader.Options options) throws IOException {
-    this.batchReader = fileReader.rows(options);
-    if (options.getSchema() == null) {
-      schema = fileReader.getSchema();
-    } else {
-      schema = options.getSchema();
-    }
-    this.batch = schema.createRowBatch();
-    rowInBatch = 0;
-  }
-
-  /**
-   * If the current batch is empty, get a new one.
-   * @return true if we have rows available.
-   * @throws IOException
-   */
-  boolean ensureBatch() throws IOException {
-    if (rowInBatch >= batch.size) {
-      rowInBatch = 0;
-      return batchReader.nextBatch(batch);
-    }
-    return true;
-  }
-
-  @Override
-  public boolean next(NullWritable key, V value) throws IOException {
-    if (!ensureBatch()) {
-      return false;
-    }
-    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
-      OrcStruct result = (OrcStruct) value;
-      List<TypeDescription> children = schema.getChildren();
-      int numberOfChildren = children.size();
-      for(int i=0; i < numberOfChildren; ++i) {
-        result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
-            children.get(i), result.getFieldValue(i)));
-      }
-    } else {
-      nextValue(batch.cols[0], rowInBatch, schema, value);
-    }
-    rowInBatch += 1;
-    return true;
-  }
-
-  @Override
-  public NullWritable createKey() {
-    return NullWritable.get();
-  }
-
-  @Override
-  public V createValue() {
-    return (V) OrcStruct.createValue(schema);
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return 0;
-  }
-
-  @Override
-  public void close() throws IOException {
-    batchReader.close();
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    return 0;
-  }
-
-  static BooleanWritable nextBoolean(ColumnVector vector,
-                                     int row,
-                                     Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      BooleanWritable result;
-      if (previous == null || previous.getClass() != BooleanWritable.class) {
-        result = new BooleanWritable();
-      } else {
-        result = (BooleanWritable) previous;
-      }
-      result.set(((LongColumnVector) vector).vector[row] != 0);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static ByteWritable nextByte(ColumnVector vector,
-                               int row,
-                               Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      ByteWritable result;
-      if (previous == null || previous.getClass() != ByteWritable.class) {
-        result = new ByteWritable();
-      } else {
-        result = (ByteWritable) previous;
-      }
-      result.set((byte) ((LongColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static ShortWritable nextShort(ColumnVector vector,
-                                 int row,
-                                 Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      ShortWritable result;
-      if (previous == null || previous.getClass() != ShortWritable.class) {
-        result = new ShortWritable();
-      } else {
-        result = (ShortWritable) previous;
-      }
-      result.set((short) ((LongColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static IntWritable nextInt(ColumnVector vector,
-                             int row,
-                             Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      IntWritable result;
-      if (previous == null || previous.getClass() != IntWritable.class) {
-        result = new IntWritable();
-      } else {
-        result = (IntWritable) previous;
-      }
-      result.set((int) ((LongColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static LongWritable nextLong(ColumnVector vector,
-                               int row,
-                               Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      LongWritable result;
-      if (previous == null || previous.getClass() != LongWritable.class) {
-        result = new LongWritable();
-      } else {
-        result = (LongWritable) previous;
-      }
-      result.set(((LongColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static FloatWritable nextFloat(ColumnVector vector,
-                                 int row,
-                                 Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      FloatWritable result;
-      if (previous == null || previous.getClass() != FloatWritable.class) {
-        result = new FloatWritable();
-      } else {
-        result = (FloatWritable) previous;
-      }
-      result.set((float) ((DoubleColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static DoubleWritable nextDouble(ColumnVector vector,
-                                   int row,
-                                   Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      DoubleWritable result;
-      if (previous == null || previous.getClass() != DoubleWritable.class) {
-        result = new DoubleWritable();
-      } else {
-        result = (DoubleWritable) previous;
-      }
-      result.set(((DoubleColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static Text nextString(ColumnVector vector,
-                         int row,
-                         Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      Text result;
-      if (previous == null || previous.getClass() != Text.class) {
-        result = new Text();
-      } else {
-        result = (Text) previous;
-      }
-      BytesColumnVector bytes = (BytesColumnVector) vector;
-      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static BytesWritable nextBinary(ColumnVector vector,
-                                  int row,
-                                  Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      BytesWritable result;
-      if (previous == null || previous.getClass() != BytesWritable.class) {
-        result = new BytesWritable();
-      } else {
-        result = (BytesWritable) previous;
-      }
-      BytesColumnVector bytes = (BytesColumnVector) vector;
-      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static HiveDecimalWritable nextDecimal(ColumnVector vector,
-                                         int row,
-                                         Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      HiveDecimalWritable result;
-      if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
-        result = new HiveDecimalWritable();
-      } else {
-        result = (HiveDecimalWritable) previous;
-      }
-      result.set(((DecimalColumnVector) vector).vector[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static DateWritable nextDate(ColumnVector vector,
-                               int row,
-                               Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      DateWritable result;
-      if (previous == null || previous.getClass() != DateWritable.class) {
-        result = new DateWritable();
-      } else {
-        result = (DateWritable) previous;
-      }
-      int date = (int) ((LongColumnVector) vector).vector[row];
-      result.set(date);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static OrcTimestamp nextTimestamp(ColumnVector vector,
-                                    int row,
-                                    Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      OrcTimestamp result;
-      if (previous == null || previous.getClass() != OrcTimestamp.class) {
-        result = new OrcTimestamp();
-      } else {
-        result = (OrcTimestamp) previous;
-      }
-      TimestampColumnVector tcv = (TimestampColumnVector) vector;
-      result.setTime(tcv.time[row]);
-      result.setNanos(tcv.nanos[row]);
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static OrcStruct nextStruct(ColumnVector vector,
-                              int row,
-                              TypeDescription schema,
-                              Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      OrcStruct result;
-      List<TypeDescription> childrenTypes = schema.getChildren();
-      int numChildren = childrenTypes.size();
-      if (previous == null || previous.getClass() != OrcStruct.class) {
-        result = new OrcStruct(schema);
-      } else {
-        result = (OrcStruct) previous;
-      }
-      StructColumnVector struct = (StructColumnVector) vector;
-      for(int f=0; f < numChildren; ++f) {
-        result.setFieldValue(f, nextValue(struct.fields[f], row,
-            childrenTypes.get(f), result.getFieldValue(f)));
-      }
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static OrcUnion nextUnion(ColumnVector vector,
-                            int row,
-                            TypeDescription schema,
-                            Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      OrcUnion result;
-      List<TypeDescription> childrenTypes = schema.getChildren();
-      if (previous == null || previous.getClass() != OrcUnion.class) {
-        result = new OrcUnion(schema);
-      } else {
-        result = (OrcUnion) previous;
-      }
-      UnionColumnVector union = (UnionColumnVector) vector;
-      byte tag = (byte) union.tags[row];
-      result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
-          result.getObject()));
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static OrcList nextList(ColumnVector vector,
-                          int row,
-                          TypeDescription schema,
-                          Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      OrcList result;
-      List<TypeDescription> childrenTypes = schema.getChildren();
-      TypeDescription valueType = childrenTypes.get(0);
-      if (previous == null ||
-          previous.getClass() != ArrayList.class) {
-        result = new OrcList(schema);
-      } else {
-        result = (OrcList) previous;
-      }
-      ListColumnVector list = (ListColumnVector) vector;
-      int length = (int) list.lengths[row];
-      int offset = (int) list.offsets[row];
-      result.ensureCapacity(length);
-      int oldLength = result.size();
-      int idx = 0;
-      while (idx < length && idx < oldLength) {
-        result.set(idx, nextValue(list.child, offset + idx, valueType,
-            result.get(idx)));
-        idx += 1;
-      }
-      if (length < oldLength) {
-        for(int i= oldLength - 1; i >= length; --i) {
-          result.remove(i);
-        }
-      } else if (oldLength < length) {
-        while (idx < length) {
-          result.add(nextValue(list.child, offset + idx, valueType, null));
-          idx += 1;
-        }
-      }
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static OrcMap nextMap(ColumnVector vector,
-                        int row,
-                        TypeDescription schema,
-                        Object previous) {
-    if (vector.isRepeating) {
-      row = 0;
-    }
-    if (vector.noNulls || !vector.isNull[row]) {
-      MapColumnVector map = (MapColumnVector) vector;
-      int length = (int) map.lengths[row];
-      int offset = (int) map.offsets[row];
-      OrcMap result;
-      List<TypeDescription> childrenTypes = schema.getChildren();
-      TypeDescription keyType = childrenTypes.get(0);
-      TypeDescription valueType = childrenTypes.get(1);
-      if (previous == null ||
-          previous.getClass() != OrcMap.class) {
-        result = new OrcMap(schema);
-      } else {
-        result = (OrcMap) previous;
-        // I couldn't think of a good way to reuse the keys and value objects
-        // without even more allocations, so take the easy and safe approach.
-        result.clear();
-      }
-      for(int e=0; e < length; ++e) {
-        result.put(nextValue(map.keys, e + offset, keyType, null),
-                   nextValue(map.values, e + offset, valueType, null));
-      }
-      return result;
-    } else {
-      return null;
-    }
-  }
-
-  static WritableComparable nextValue(ColumnVector vector,
-                                      int row,
-                                      TypeDescription schema,
-                                      Object previous) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        return nextBoolean(vector, row, previous);
-      case BYTE:
-        return nextByte(vector, row, previous);
-      case SHORT:
-        return nextShort(vector, row, previous);
-      case INT:
-        return nextInt(vector, row, previous);
-      case LONG:
-        return nextLong(vector, row, previous);
-      case FLOAT:
-        return nextFloat(vector, row, previous);
-      case DOUBLE:
-        return nextDouble(vector, row, previous);
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return nextString(vector, row, previous);
-      case BINARY:
-        return nextBinary(vector, row, previous);
-      case DECIMAL:
-        return nextDecimal(vector, row, previous);
-      case DATE:
-        return nextDate(vector, row, previous);
-      case TIMESTAMP:
-        return nextTimestamp(vector, row, previous);
-      case STRUCT:
-        return nextStruct(vector, row, schema, previous);
-      case UNION:
-        return nextUnion(vector, row, schema, previous);
-      case LIST:
-        return nextList(vector, row, schema, previous);
-      case MAP:
-        return nextMap(vector, row, schema, previous);
-      default:
-        throw new IllegalArgumentException("Unknown type " + schema);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
deleted file mode 100644
index 4237656..0000000
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.mapred;
-
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.BinaryComparable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class OrcRecordWriter<V extends Writable>
-    implements RecordWriter<NullWritable, V> {
-  private final Writer writer;
-  private final VectorizedRowBatch batch;
-  private final TypeDescription schema;
-  private final boolean isTopStruct;
-
-  public OrcRecordWriter(Writer writer) {
-    this.writer = writer;
-    schema = writer.getSchema();
-    this.batch = schema.createRowBatch();
-    isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
-  }
-
-  static void setLongValue(ColumnVector vector, int row, long value) {
-    ((LongColumnVector) vector).vector[row] = value;
-  }
-
-  static void setDoubleValue(ColumnVector vector, int row, double value) {
-    ((DoubleColumnVector) vector).vector[row] = value;
-  }
-
-  static void setBinaryValue(ColumnVector vector, int row,
-                             BinaryComparable value) {
-    ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
-        value.getLength());
-  }
-
-  static void setBinaryValue(ColumnVector vector, int row,
-                             BinaryComparable value, int maxLength) {
-    ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
-        Math.min(maxLength, value.getLength()));
-  }
-
-  private static final ThreadLocal<byte[]> SPACE_BUFFER =
-      new ThreadLocal<byte[]>() {
-        @Override
-        protected byte[] initialValue() {
-          byte[] result = new byte[100];
-          Arrays.fill(result, (byte) ' ');
-          return result;
-        }
-      };
-
-  static void setCharValue(BytesColumnVector vector,
-                           int row,
-                           Text value,
-                           int length) {
-    // we need to trim or pad the string with spaces to required length
-    int actualLength = value.getLength();
-    if (actualLength >= length) {
-      setBinaryValue(vector, row, value, length);
-    } else {
-      byte[] spaces = SPACE_BUFFER.get();
-      if (length - actualLength > spaces.length) {
-        spaces = new byte[length - actualLength];
-        Arrays.fill(spaces, (byte)' ');
-        SPACE_BUFFER.set(spaces);
-      }
-      vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0,
-          length - actualLength);
-    }
-  }
-
-  static void setStructValue(TypeDescription schema,
-                             StructColumnVector vector,
-                             int row,
-                             OrcStruct value) {
-    List<TypeDescription> children = schema.getChildren();
-    for(int c=0; c < value.getNumFields(); ++c) {
-      setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c));
-    }
-  }
-
-  static void setUnionValue(TypeDescription schema,
-                            UnionColumnVector vector,
-                            int row,
-                            OrcUnion value) {
-    List<TypeDescription> children = schema.getChildren();
-    int tag = value.getTag() & 0xff;
-    vector.tags[row] = tag;
-    setColumn(children.get(tag), vector.fields[tag], row, value.getObject());
-  }
-
-
-  static void setListValue(TypeDescription schema,
-                           ListColumnVector vector,
-                           int row,
-                           OrcList value) {
-    TypeDescription elemType = schema.getChildren().get(0);
-    vector.offsets[row] = vector.childCount;
-    vector.lengths[row] = value.size();
-    vector.childCount += vector.lengths[row];
-    vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0);
-    for(int e=0; e < vector.lengths[row]; ++e) {
-      setColumn(elemType, vector.child, (int) vector.offsets[row] + e,
-          (Writable) value.get(e));
-    }
-  }
-
-  static void setMapValue(TypeDescription schema,
-                          MapColumnVector vector,
-                          int row,
-                          OrcMap<?,?> value) {
-    TypeDescription keyType = schema.getChildren().get(0);
-    TypeDescription valueType = schema.getChildren().get(1);
-    vector.offsets[row] = vector.childCount;
-    vector.lengths[row] = value.size();
-    vector.childCount += vector.lengths[row];
-    vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0);
-    vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0);
-    int e = 0;
-    for(Map.Entry<?,?> entry: value.entrySet()) {
-      setColumn(keyType, vector.keys, (int) vector.offsets[row] + e,
-          (Writable) entry.getKey());
-      setColumn(valueType, vector.values, (int) vector.offsets[row] + e,
-          (Writable) entry.getValue());
-      e += 1;
-    }
-  }
-
-  static void setColumn(TypeDescription schema,
-                        ColumnVector vector,
-                        int row,
-                        Writable value) {
-    if (value == null) {
-      vector.noNulls = false;
-      vector.isNull[row] = true;
-    } else {
-      switch (schema.getCategory()) {
-        case BOOLEAN:
-          setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0);
-          break;
-        case BYTE:
-          setLongValue(vector, row, ((ByteWritable) value).get());
-          break;
-        case SHORT:
-          setLongValue(vector, row, ((ShortWritable) value).get());
-          break;
-        case INT:
-          setLongValue(vector, row, ((IntWritable) value).get());
-          break;
-        case LONG:
-          setLongValue(vector, row, ((LongWritable) value).get());
-          break;
-        case FLOAT:
-          setDoubleValue(vector, row, ((FloatWritable) value).get());
-          break;
-        case DOUBLE:
-          setDoubleValue(vector, row, ((DoubleWritable) value).get());
-          break;
-        case STRING:
-          setBinaryValue(vector, row, (Text) value);
-          break;
-        case CHAR:
-          setCharValue((BytesColumnVector) vector, row, (Text) value,
-              schema.getMaxLength());
-          break;
-        case VARCHAR:
-          setBinaryValue(vector, row, (Text) value, schema.getMaxLength());
-          break;
-        case BINARY:
-          setBinaryValue(vector, row, (BytesWritable) value);
-          break;
-        case DATE:
-          setLongValue(vector, row, ((DateWritable) value).getDays());
-          break;
-        case TIMESTAMP:
-          ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value);
-          break;
-        case DECIMAL:
-          ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value);
-          break;
-        case STRUCT:
-          setStructValue(schema, (StructColumnVector) vector, row,
-              (OrcStruct) value);
-          break;
-        case UNION:
-          setUnionValue(schema, (UnionColumnVector) vector, row,
-              (OrcUnion) value);
-          break;
-        case LIST:
-          setListValue(schema, (ListColumnVector) vector, row, (OrcList) value);
-          break;
-        case MAP:
-          setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value);
-          break;
-        default:
-          throw new IllegalArgumentException("Unknown type " + schema);
-      }
-    }
-  }
-
-  @Override
-  public void write(NullWritable nullWritable, V v) throws IOException {
-    // if the batch is full, write it out.
-    if (batch.size == batch.getMaxSize()) {
-      writer.addRowBatch(batch);
-      batch.reset();
-    }
-
-    // add the new row
-    int row = batch.size++;
-    if (isTopStruct) {
-      for(int f=0; f < schema.getChildren().size(); ++f) {
-        setColumn(schema.getChildren().get(f), batch.cols[f], row,
-            ((OrcStruct) v).getFieldValue(f));
-      }
-    } else {
-      setColumn(schema, batch.cols[0], row, v);
-    }
-  }
-
-  @Override
-  public void close(Reporter reporter) throws IOException {
-    if (batch.size != 0) {
-      writer.addRowBatch(batch);
-      batch.reset();
-    }
-    writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
index 2dd749b..51e0d60 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
@@ -82,6 +82,14 @@ public final class OrcStruct implements WritableComparable<OrcStruct> {
   }
 
   /**
+   * Get the schema for this object.
+   * @return the schema object
+   */
+  public TypeDescription getSchema() {
+    return schema;
+  }
+
+  /**
    * Set all of the fields in the struct
    * @param values the list of values for each of the fields.
    */

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
index ee97b9f..5564177 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
@@ -17,12 +17,12 @@
  */
 package org.apache.orc.mapred;
 
-
 import org.apache.hadoop.io.WritableComparable;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Date;
 import java.sql.Timestamp;
 import java.util.Date;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java
new file mode 100644
index 0000000..dc9912d
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcValue.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * This type provides a wrapper for OrcStruct so that it can be sent through
+ * the MapReduce shuffle as a value.
+ *
+ * The user should set the JobConf with orc.mapred.value.type with the type
+ * string of the type.
+ */
+public final class OrcValue implements Writable, JobConfigurable {
+
+  public WritableComparable value;
+
+  public OrcValue(WritableComparable value) {
+    this.value = value;
+  }
+
+  public OrcValue() {
+    value = null;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    value.write(dataOutput);
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    value.readFields(dataInput);
+  }
+
+  @Override
+  public void configure(JobConf conf) {
+    if (value == null) {
+      TypeDescription schema =
+          TypeDescription.fromString(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA
+              .getString(conf));
+      value = OrcStruct.createValue(schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3bb5ce53/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
new file mode 100644
index 0000000..9427e78
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcInputFormat.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+
+/**
+ * An ORC input format that satisfies the org.apache.hadoop.mapreduce API.
+ */
+public class OrcInputFormat<V extends WritableComparable>
+    extends FileInputFormat<NullWritable, V> {
+
+  /**
+   * Put the given SearchArgument into the configuration for an OrcInputFormat.
+   * @param conf the configuration to modify
+   * @param sarg the SearchArgument to put in the configuration
+   * @param columnNames the list of column names for the SearchArgument
+   */
+  public static void setSearchArgument(Configuration conf,
+                                       SearchArgument sarg,
+                                       String[] columnNames) {
+    org.apache.orc.mapred.OrcInputFormat.setSearchArgument(conf, sarg,
+        columnNames);
+  }
+
+  @Override
+  public RecordReader<NullWritable, V>
+      createRecordReader(InputSplit inputSplit,
+                         TaskAttemptContext taskAttemptContext
+                         ) throws IOException, InterruptedException {
+    FileSplit split = (FileSplit) inputSplit;
+    Configuration conf = taskAttemptContext.getConfiguration();
+    Reader file = OrcFile.createReader(split.getPath(),
+        OrcFile.readerOptions(conf)
+            .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+    return new OrcMapreduceRecordReader<>(file,
+        org.apache.orc.mapred.OrcInputFormat.buildOptions(conf,
+            file, split.getStart(), split.getLength()));
+  }
+}