You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/05/13 19:50:34 UTC

[05/23] orc git commit: ORC-1 Import of ORC code from Hive. (omalley reviewed by prasanthj)

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/java/mapreduce/pom.xml b/java/mapreduce/pom.xml
new file mode 100644
index 0000000..8792e1c
--- /dev/null
+++ b/java/mapreduce/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.orc</groupId>
+    <artifactId>orc</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>orc-mapreduce</artifactId>
+  <packaging>jar</packaging>
+  <name>ORC MapReduce</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-storage-api</artifactId>
+      <version>2.0.0.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-core</artifactId>
+      <version>1.1.0-SNAPSHOT</version>
+    </dependency>
+
+    <!-- inter-project -->
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo-shaded</artifactId>
+      <version>${kryo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>${commons-codec.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet.jsp</groupId>
+          <artifactId>jsp-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- test inter-project -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <testResources>
+      <testResource>
+        <directory>${basedir}/src/test/resources</directory>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.1</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
new file mode 100644
index 0000000..78e75f7
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcInputFormat.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+  
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+
+/**
+ * A MapReduce/Hive input format for ORC files.
+ */
+public class OrcInputFormat<V extends Writable>
+    extends FileInputFormat<NullWritable, V> {
+
+  /**
+   * Convert a string with a comma separated list of column ids into the
+   * array of boolean that match the schemas.
+   * @param schema the schema for the reader
+   * @param columnsStr the comma separated list of column ids
+   * @return a boolean array
+   */
+  static boolean[] parseInclude(TypeDescription schema, String columnsStr) {
+    if (columnsStr == null ||
+        schema.getCategory() != TypeDescription.Category.STRUCT) {
+      return null;
+    }
+    boolean[] result = new boolean[schema.getMaximumId() + 1];
+    result[0] = true;
+    List<TypeDescription> types = schema.getChildren();
+    for(String idString: columnsStr.split(",")) {
+      TypeDescription type = types.get(Integer.parseInt(idString));
+      for(int c=type.getId(); c <= type.getMaximumId(); ++c) {
+        result[c] = true;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Put the given SearchArgument into the configuration for an OrcInputFormat.
+   * @param conf the configuration to modify
+   * @param sarg the SearchArgument to put in the configuration
+   * @param columnNames the list of column names for the SearchArgument
+   */
+  public static void setSearchArgument(Configuration conf,
+                                       SearchArgument sarg,
+                                       String[] columnNames) {
+    Output out = new Output();
+    new Kryo().writeObject(out, sarg);
+    conf.set(OrcConf.KRYO_SARG.getAttribute(),
+        Base64.encodeBase64String(out.toBytes()));
+    StringBuilder buffer = new StringBuilder();
+    for(int i=0; i < columnNames.length; ++i) {
+      if (i != 0) {
+        buffer.append(',');
+      }
+      buffer.append(columnNames[i]);
+    }
+    conf.set(OrcConf.SARG_COLUMNS.getAttribute(), buffer.toString());
+  }
+
+  @Override
+  public RecordReader<NullWritable, V>
+  getRecordReader(InputSplit inputSplit,
+                  JobConf conf,
+                  Reporter reporter) throws IOException {
+    FileSplit split = (FileSplit) inputSplit;
+    Reader file = OrcFile.createReader(split.getPath(),
+        OrcFile.readerOptions(conf)
+            .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)));
+    TypeDescription schema =
+        TypeDescription.fromString(OrcConf.SCHEMA.getString(conf));
+    Reader.Options options = new Reader.Options()
+        .range(split.getStart(), split.getLength())
+        .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+        .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
+    if (schema == null) {
+      schema = file.getSchema();
+    } else {
+      options.schema(schema);
+    }
+    options.include(parseInclude(schema,
+        OrcConf.INCLUDE_COLUMNS.getString(conf)));
+    String kryoSarg = OrcConf.KRYO_SARG.getString(conf);
+    String sargColumns = OrcConf.SARG_COLUMNS.getString(conf);
+    if (kryoSarg != null && sargColumns != null) {
+      byte[] sargBytes = Base64.decodeBase64(kryoSarg);
+      SearchArgument sarg =
+          new Kryo().readObject(new Input(sargBytes), SearchArgumentImpl.class);
+      options.searchArgument(sarg, sargColumns.split(","));
+    }
+    return new OrcRecordReader(file, options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java
new file mode 100644
index 0000000..2b94207
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcList.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * An ArrayList implementation that implements Writable.
+ * @param <E> the element type, which must be Writable
+ */
+public class OrcList<E extends Writable>
+    extends ArrayList<E> implements Writable {
+  private final TypeDescription childSchema;
+
+  public OrcList(TypeDescription schema) {
+    childSchema = schema.getChildren().get(0);
+  }
+
+  public OrcList(TypeDescription schema, int initialCapacity) {
+    super(initialCapacity);
+    childSchema = schema.getChildren().get(0);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    Iterator<E> itr = iterator();
+    output.writeInt(size());
+    while (itr.hasNext()) {
+      E obj = itr.next();
+      output.writeBoolean(obj != null);
+      if (obj != null) {
+        obj.write(output);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    clear();
+    int size = input.readInt();
+    ensureCapacity(size);
+    for(int i=0; i < size; ++i) {
+      if (input.readBoolean()) {
+        E obj = (E) OrcStruct.createValue(childSchema);
+        obj.readFields(input);
+        add(obj);
+      } else {
+        add(null);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
new file mode 100644
index 0000000..db961fc
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMap.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A TreeMap implementation that implements Writable.
+ * @param <K> the key type, which must be Writable
+ * @param <V> the value type, which must be Writable
+ */
+public final class OrcMap<K extends Writable, V extends Writable>
+    extends TreeMap<K, V> implements Writable {
+  private final TypeDescription keySchema;
+  private final TypeDescription valueSchema;
+
+  public OrcMap(TypeDescription schema) {
+    keySchema = schema.getChildren().get(0);
+    valueSchema = schema.getChildren().get(1);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    Iterator<Map.Entry<K,V>> itr = entrySet().iterator();
+    output.writeInt(size());
+    for(Map.Entry<K,V> entry: entrySet()) {
+      K key = entry.getKey();
+      V value = entry.getValue();
+      output.writeByte((key == null ? 0 : 2) | (value == null ? 0 : 1));
+      if (key != null) {
+        key.write(output);
+      }
+      if (value != null) {
+        value.write(output);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    clear();
+    int size = input.readInt();
+    for(int i=0; i < size; ++i) {
+      byte flag = input.readByte();
+      K key;
+      V value;
+      if ((flag & 2) != 0) {
+        key = (K) OrcStruct.createValue(keySchema);
+        key.readFields(input);
+      } else {
+        key = null;
+      }
+      if ((flag & 1) != 0) {
+        value = (V) OrcStruct.createValue(valueSchema);
+        value.readFields(input);
+      } else {
+        value = null;
+      }
+      put(key, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
new file mode 100644
index 0000000..6186c83
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+
+public class OrcOutputFormat<V extends Writable>
+    extends FileOutputFormat<NullWritable, V> {
+
+  @Override
+  public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem,
+                                                       JobConf conf,
+                                                       String name,
+                                                       Progressable progressable
+                                                       ) throws IOException {
+    Path path = getTaskOutputPath(conf, name);
+    Writer writer = OrcFile.createWriter(path,
+        OrcFile.writerOptions(conf)
+            .fileSystem(fileSystem)
+            .version(OrcFile.Version.byName(OrcConf.WRITE_FORMAT.getString(conf)))
+            .setSchema(TypeDescription.fromString(OrcConf.SCHEMA.getString(conf)))
+            .compress(CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf)))
+            .encodingStrategy(OrcFile.EncodingStrategy.valueOf
+                (OrcConf.ENCODING_STRATEGY.getString(conf)))
+            .bloomFilterColumns(OrcConf.BLOOM_FILTER_COLUMNS.getString(conf))
+            .bloomFilterFpp(OrcConf.BLOOM_FILTER_FPP.getDouble(conf))
+            .blockSize(OrcConf.BLOCK_SIZE.getLong(conf))
+            .blockPadding(OrcConf.BLOCK_PADDING.getBoolean(conf))
+            .stripeSize(OrcConf.STRIPE_SIZE.getLong(conf))
+            .rowIndexStride((int) OrcConf.ROW_INDEX_STRIDE.getLong(conf))
+            .bufferSize((int) OrcConf.BUFFER_SIZE.getLong(conf))
+            .paddingTolerance(OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf)));
+    return new OrcRecordWriter<>(writer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
new file mode 100644
index 0000000..0370ae5
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordReader.java
@@ -0,0 +1,547 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+public class OrcRecordReader<V extends Writable>
+    implements org.apache.hadoop.mapred.RecordReader<NullWritable, V> {
+  private final TypeDescription schema;
+  private final RecordReader batchReader;
+  private final VectorizedRowBatch batch;
+  private int rowInBatch;
+
+  protected OrcRecordReader(Reader fileReader,
+                            Reader.Options options) throws IOException {
+    this.batchReader = fileReader.rows(options);
+    if (options.getSchema() == null) {
+      schema = fileReader.getSchema();
+    } else {
+      schema = options.getSchema();
+    }
+    this.batch = schema.createRowBatch();
+    rowInBatch = 0;
+  }
+
+  /**
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   * @throws IOException
+   */
+  boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      rowInBatch = 0;
+      return batchReader.nextBatch(batch);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next(NullWritable key, V value) throws IOException {
+    if (!ensureBatch()) {
+      return false;
+    }
+    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+      OrcStruct result = (OrcStruct) value;
+      List<TypeDescription> children = schema.getChildren();
+      int numberOfChildren = children.size();
+      for(int i=0; i < numberOfChildren; ++i) {
+        result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
+            children.get(i), result.getFieldValue(i)));
+      }
+    } else {
+      nextValue(batch.cols[0], rowInBatch, schema, value);
+    }
+    rowInBatch += 1;
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public V createValue() {
+    return (V) OrcStruct.createValue(schema);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    batchReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  static BooleanWritable nextBoolean(ColumnVector vector,
+                                     int row,
+                                     Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BooleanWritable result;
+      if (previous == null || previous.getClass() != BooleanWritable.class) {
+        result = new BooleanWritable();
+      } else {
+        result = (BooleanWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row] != 0);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static ByteWritable nextByte(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ByteWritable result;
+      if (previous == null || previous.getClass() != ByteWritable.class) {
+        result = new ByteWritable();
+      } else {
+        result = (ByteWritable) previous;
+      }
+      result.set((byte) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static ShortWritable nextShort(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ShortWritable result;
+      if (previous == null || previous.getClass() != ShortWritable.class) {
+        result = new ShortWritable();
+      } else {
+        result = (ShortWritable) previous;
+      }
+      result.set((short) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static IntWritable nextInt(ColumnVector vector,
+                             int row,
+                             Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      IntWritable result;
+      if (previous == null || previous.getClass() != IntWritable.class) {
+        result = new IntWritable();
+      } else {
+        result = (IntWritable) previous;
+      }
+      result.set((int) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static LongWritable nextLong(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      LongWritable result;
+      if (previous == null || previous.getClass() != LongWritable.class) {
+        result = new LongWritable();
+      } else {
+        result = (LongWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static FloatWritable nextFloat(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      FloatWritable result;
+      if (previous == null || previous.getClass() != FloatWritable.class) {
+        result = new FloatWritable();
+      } else {
+        result = (FloatWritable) previous;
+      }
+      result.set((float) ((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static DoubleWritable nextDouble(ColumnVector vector,
+                                   int row,
+                                   Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      DoubleWritable result;
+      if (previous == null || previous.getClass() != DoubleWritable.class) {
+        result = new DoubleWritable();
+      } else {
+        result = (DoubleWritable) previous;
+      }
+      result.set(((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static Text nextString(ColumnVector vector,
+                         int row,
+                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      Text result;
+      if (previous == null || previous.getClass() != Text.class) {
+        result = new Text();
+      } else {
+        result = (Text) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static BytesWritable nextBinary(ColumnVector vector,
+                                  int row,
+                                  Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BytesWritable result;
+      if (previous == null || previous.getClass() != BytesWritable.class) {
+        result = new BytesWritable();
+      } else {
+        result = (BytesWritable) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static HiveDecimalWritable nextDecimal(ColumnVector vector,
+                                         int row,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveDecimalWritable result;
+      if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
+        result = new HiveDecimalWritable();
+      } else {
+        result = (HiveDecimalWritable) previous;
+      }
+      result.set(((DecimalColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static DateWritable nextDate(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      DateWritable result;
+      if (previous == null || previous.getClass() != DateWritable.class) {
+        result = new DateWritable();
+      } else {
+        result = (DateWritable) previous;
+      }
+      int date = (int) ((LongColumnVector) vector).vector[row];
+      result.set(date);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcTimestamp nextTimestamp(ColumnVector vector,
+                                    int row,
+                                    Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcTimestamp result;
+      if (previous == null || previous.getClass() != OrcTimestamp.class) {
+        result = new OrcTimestamp();
+      } else {
+        result = (OrcTimestamp) previous;
+      }
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      result.setTime(tcv.time[row]);
+      result.setNanos(tcv.nanos[row]);
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcStruct nextStruct(ColumnVector vector,
+                              int row,
+                              TypeDescription schema,
+                              Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcStruct result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      int numChildren = childrenTypes.size();
+      if (previous == null || previous.getClass() != OrcStruct.class) {
+        result = new OrcStruct(schema);
+      } else {
+        result = (OrcStruct) previous;
+      }
+      StructColumnVector struct = (StructColumnVector) vector;
+      for(int f=0; f < numChildren; ++f) {
+        result.setFieldValue(f, nextValue(struct.fields[f], row,
+            childrenTypes.get(f), result.getFieldValue(f)));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcUnion nextUnion(ColumnVector vector,
+                            int row,
+                            TypeDescription schema,
+                            Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcUnion result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      if (previous == null || previous.getClass() != OrcUnion.class) {
+        result = new OrcUnion(schema);
+      } else {
+        result = (OrcUnion) previous;
+      }
+      UnionColumnVector union = (UnionColumnVector) vector;
+      byte tag = (byte) union.tags[row];
+      result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
+          result.getObject()));
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcList nextList(ColumnVector vector,
+                          int row,
+                          TypeDescription schema,
+                          Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcList result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      TypeDescription valueType = childrenTypes.get(0);
+      if (previous == null ||
+          previous.getClass() != ArrayList.class) {
+        result = new OrcList(schema);
+      } else {
+        result = (OrcList) previous;
+      }
+      ListColumnVector list = (ListColumnVector) vector;
+      int length = (int) list.lengths[row];
+      int offset = (int) list.offsets[row];
+      result.ensureCapacity(length);
+      int oldLength = result.size();
+      int idx = 0;
+      while (idx < length && idx < oldLength) {
+        result.set(idx, nextValue(list.child, offset + idx, valueType,
+            result.get(idx)));
+        idx += 1;
+      }
+      if (length < oldLength) {
+        for(int i= oldLength - 1; i >= length; --i) {
+          result.remove(i);
+        }
+      } else if (oldLength < length) {
+        while (idx < length) {
+          result.add(nextValue(list.child, offset + idx, valueType, null));
+          idx += 1;
+        }
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static OrcMap nextMap(ColumnVector vector,
+                        int row,
+                        TypeDescription schema,
+                        Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      MapColumnVector map = (MapColumnVector) vector;
+      int length = (int) map.lengths[row];
+      int offset = (int) map.offsets[row];
+      OrcMap result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      TypeDescription keyType = childrenTypes.get(0);
+      TypeDescription valueType = childrenTypes.get(1);
+      if (previous == null ||
+          previous.getClass() != OrcMap.class) {
+        result = new OrcMap(schema);
+      } else {
+        result = (OrcMap) previous;
+        // I couldn't think of a good way to reuse the keys and value objects
+        // without even more allocations, so take the easy and safe approach.
+        result.clear();
+      }
+      for(int e=0; e < length; ++e) {
+        result.put(nextValue(map.keys, e + offset, keyType, null),
+                   nextValue(map.values, e + offset, valueType, null));
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  static Writable nextValue(ColumnVector vector,
+                            int row,
+                            TypeDescription schema,
+                            Object previous) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return nextBoolean(vector, row, previous);
+      case BYTE:
+        return nextByte(vector, row, previous);
+      case SHORT:
+        return nextShort(vector, row, previous);
+      case INT:
+        return nextInt(vector, row, previous);
+      case LONG:
+        return nextLong(vector, row, previous);
+      case FLOAT:
+        return nextFloat(vector, row, previous);
+      case DOUBLE:
+        return nextDouble(vector, row, previous);
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return nextString(vector, row, previous);
+      case BINARY:
+        return nextBinary(vector, row, previous);
+      case DECIMAL:
+        return nextDecimal(vector, row, previous);
+      case DATE:
+        return nextDate(vector, row, previous);
+      case TIMESTAMP:
+        return nextTimestamp(vector, row, previous);
+      case STRUCT:
+        return nextStruct(vector, row, schema, previous);
+      case UNION:
+        return nextUnion(vector, row, schema, previous);
+      case LIST:
+        return nextList(vector, row, schema, previous);
+      case MAP:
+        return nextMap(vector, row, schema, previous);
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
new file mode 100644
index 0000000..4237656
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcRecordWriter.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class OrcRecordWriter<V extends Writable>
+    implements RecordWriter<NullWritable, V> {
+  private final Writer writer;
+  private final VectorizedRowBatch batch;
+  private final TypeDescription schema;
+  private final boolean isTopStruct;
+
+  public OrcRecordWriter(Writer writer) {
+    this.writer = writer;
+    schema = writer.getSchema();
+    this.batch = schema.createRowBatch();
+    isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT;
+  }
+
+  static void setLongValue(ColumnVector vector, int row, long value) {
+    ((LongColumnVector) vector).vector[row] = value;
+  }
+
+  static void setDoubleValue(ColumnVector vector, int row, double value) {
+    ((DoubleColumnVector) vector).vector[row] = value;
+  }
+
+  static void setBinaryValue(ColumnVector vector, int row,
+                             BinaryComparable value) {
+    ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
+        value.getLength());
+  }
+
+  static void setBinaryValue(ColumnVector vector, int row,
+                             BinaryComparable value, int maxLength) {
+    ((BytesColumnVector) vector).setVal(row, value.getBytes(), 0,
+        Math.min(maxLength, value.getLength()));
+  }
+
+  private static final ThreadLocal<byte[]> SPACE_BUFFER =
+      new ThreadLocal<byte[]>() {
+        @Override
+        protected byte[] initialValue() {
+          byte[] result = new byte[100];
+          Arrays.fill(result, (byte) ' ');
+          return result;
+        }
+      };
+
+  static void setCharValue(BytesColumnVector vector,
+                           int row,
+                           Text value,
+                           int length) {
+    // we need to trim or pad the string with spaces to required length
+    int actualLength = value.getLength();
+    if (actualLength >= length) {
+      setBinaryValue(vector, row, value, length);
+    } else {
+      byte[] spaces = SPACE_BUFFER.get();
+      if (length - actualLength > spaces.length) {
+        spaces = new byte[length - actualLength];
+        Arrays.fill(spaces, (byte)' ');
+        SPACE_BUFFER.set(spaces);
+      }
+      vector.setConcat(row, value.getBytes(), 0, actualLength, spaces, 0,
+          length - actualLength);
+    }
+  }
+
+  static void setStructValue(TypeDescription schema,
+                             StructColumnVector vector,
+                             int row,
+                             OrcStruct value) {
+    List<TypeDescription> children = schema.getChildren();
+    for(int c=0; c < value.getNumFields(); ++c) {
+      setColumn(children.get(c), vector.fields[c], row, value.getFieldValue(c));
+    }
+  }
+
+  static void setUnionValue(TypeDescription schema,
+                            UnionColumnVector vector,
+                            int row,
+                            OrcUnion value) {
+    List<TypeDescription> children = schema.getChildren();
+    int tag = value.getTag() & 0xff;
+    vector.tags[row] = tag;
+    setColumn(children.get(tag), vector.fields[tag], row, value.getObject());
+  }
+
+
+  static void setListValue(TypeDescription schema,
+                           ListColumnVector vector,
+                           int row,
+                           OrcList value) {
+    TypeDescription elemType = schema.getChildren().get(0);
+    vector.offsets[row] = vector.childCount;
+    vector.lengths[row] = value.size();
+    vector.childCount += vector.lengths[row];
+    vector.child.ensureSize(vector.childCount, vector.offsets[row] != 0);
+    for(int e=0; e < vector.lengths[row]; ++e) {
+      setColumn(elemType, vector.child, (int) vector.offsets[row] + e,
+          (Writable) value.get(e));
+    }
+  }
+
+  static void setMapValue(TypeDescription schema,
+                          MapColumnVector vector,
+                          int row,
+                          OrcMap<?,?> value) {
+    TypeDescription keyType = schema.getChildren().get(0);
+    TypeDescription valueType = schema.getChildren().get(1);
+    vector.offsets[row] = vector.childCount;
+    vector.lengths[row] = value.size();
+    vector.childCount += vector.lengths[row];
+    vector.keys.ensureSize(vector.childCount, vector.offsets[row] != 0);
+    vector.values.ensureSize(vector.childCount, vector.offsets[row] != 0);
+    int e = 0;
+    for(Map.Entry<?,?> entry: value.entrySet()) {
+      setColumn(keyType, vector.keys, (int) vector.offsets[row] + e,
+          (Writable) entry.getKey());
+      setColumn(valueType, vector.values, (int) vector.offsets[row] + e,
+          (Writable) entry.getValue());
+      e += 1;
+    }
+  }
+
+  static void setColumn(TypeDescription schema,
+                        ColumnVector vector,
+                        int row,
+                        Writable value) {
+    if (value == null) {
+      vector.noNulls = false;
+      vector.isNull[row] = true;
+    } else {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          setLongValue(vector, row, ((BooleanWritable) value).get() ? 1 : 0);
+          break;
+        case BYTE:
+          setLongValue(vector, row, ((ByteWritable) value).get());
+          break;
+        case SHORT:
+          setLongValue(vector, row, ((ShortWritable) value).get());
+          break;
+        case INT:
+          setLongValue(vector, row, ((IntWritable) value).get());
+          break;
+        case LONG:
+          setLongValue(vector, row, ((LongWritable) value).get());
+          break;
+        case FLOAT:
+          setDoubleValue(vector, row, ((FloatWritable) value).get());
+          break;
+        case DOUBLE:
+          setDoubleValue(vector, row, ((DoubleWritable) value).get());
+          break;
+        case STRING:
+          setBinaryValue(vector, row, (Text) value);
+          break;
+        case CHAR:
+          setCharValue((BytesColumnVector) vector, row, (Text) value,
+              schema.getMaxLength());
+          break;
+        case VARCHAR:
+          setBinaryValue(vector, row, (Text) value, schema.getMaxLength());
+          break;
+        case BINARY:
+          setBinaryValue(vector, row, (BytesWritable) value);
+          break;
+        case DATE:
+          setLongValue(vector, row, ((DateWritable) value).getDays());
+          break;
+        case TIMESTAMP:
+          ((TimestampColumnVector) vector).set(row, (OrcTimestamp) value);
+          break;
+        case DECIMAL:
+          ((DecimalColumnVector) vector).set(row, (HiveDecimalWritable) value);
+          break;
+        case STRUCT:
+          setStructValue(schema, (StructColumnVector) vector, row,
+              (OrcStruct) value);
+          break;
+        case UNION:
+          setUnionValue(schema, (UnionColumnVector) vector, row,
+              (OrcUnion) value);
+          break;
+        case LIST:
+          setListValue(schema, (ListColumnVector) vector, row, (OrcList) value);
+          break;
+        case MAP:
+          setMapValue(schema, (MapColumnVector) vector, row, (OrcMap) value);
+          break;
+        default:
+          throw new IllegalArgumentException("Unknown type " + schema);
+      }
+    }
+  }
+
+  @Override
+  public void write(NullWritable nullWritable, V v) throws IOException {
+    // if the batch is full, write it out.
+    if (batch.size == batch.getMaxSize()) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+
+    // add the new row
+    int row = batch.size++;
+    if (isTopStruct) {
+      for(int f=0; f < schema.getChildren().size(); ++f) {
+        setColumn(schema.getChildren().get(f), batch.cols[f], row,
+            ((OrcStruct) v).getFieldValue(f));
+      }
+    } else {
+      setColumn(schema, batch.cols[0], row, v);
+    }
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
new file mode 100644
index 0000000..74b3b28
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcStruct.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public final class OrcStruct implements Writable {
+
+  private Writable[] fields;
+  private final TypeDescription schema;
+
+  public OrcStruct(TypeDescription schema) {
+    this.schema = schema;
+    fields = new Writable[schema.getChildren().size()];
+  }
+
+  public Writable getFieldValue(int fieldIndex) {
+    return fields[fieldIndex];
+  }
+
+  public void setFieldValue(int fieldIndex, Writable value) {
+    fields[fieldIndex] = value;
+  }
+
+  public int getNumFields() {
+    return fields.length;
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    for(int f=0; f < fields.length; ++f) {
+      output.writeBoolean(fields[f] != null);
+      if (fields[f] != null) {
+        fields[f].write(output);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    for(int f=0; f < fields.length; ++f) {
+      if (input.readBoolean()) {
+        if (fields[f] == null) {
+          fields[f] = createValue(schema.getChildren().get(f));
+        }
+        fields[f].readFields(input);
+      } else {
+        fields[f] = null;
+      }
+    }
+  }
+
+  public void setFieldValue(String fieldName, Writable value) {
+    int fieldIdx = schema.getFieldNames().indexOf(fieldName);
+    if (fieldIdx == -1) {
+      throw new IllegalArgumentException("Field " + fieldName +
+          " not found in " + schema);
+    }
+    fields[fieldIdx] = value;
+  }
+
+  public Writable getFieldValue(String fieldName) {
+    int fieldIdx = schema.getFieldNames().indexOf(fieldName);
+    if (fieldIdx == -1) {
+      throw new IllegalArgumentException("Field " + fieldName +
+          " not found in " + schema);
+    }
+    return fields[fieldIdx];
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || other.getClass() != OrcStruct.class) {
+      return false;
+    } else {
+      OrcStruct oth = (OrcStruct) other;
+      if (fields.length != oth.fields.length) {
+        return false;
+      }
+      for(int i=0; i < fields.length; ++i) {
+        if (fields[i] == null) {
+          if (oth.fields[i] != null) {
+            return false;
+          }
+        } else {
+          if (!fields[i].equals(oth.fields[i])) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    int result = fields.length;
+    for(Object field: fields) {
+      if (field != null) {
+        result ^= field.hashCode();
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("{");
+    for(int i=0; i < fields.length; ++i) {
+      if (i != 0) {
+        buffer.append(", ");
+      }
+      buffer.append(fields[i]);
+    }
+    buffer.append("}");
+    return buffer.toString();
+  }
+
+  /* Routines for stubbing into Writables */
+
+  public static Writable createValue(TypeDescription type) {
+    switch (type.getCategory()) {
+      case BOOLEAN: return new BooleanWritable();
+      case BYTE: return new ByteWritable();
+      case SHORT: return new ShortWritable();
+      case INT: return new IntWritable();
+      case LONG: return new LongWritable();
+      case FLOAT: return new FloatWritable();
+      case DOUBLE: return new DoubleWritable();
+      case BINARY: return new BytesWritable();
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        return new Text();
+      case DATE:
+        return new DateWritable();
+      case TIMESTAMP:
+        return new OrcTimestamp();
+      case DECIMAL:
+        return new HiveDecimalWritable();
+      case STRUCT: {
+        OrcStruct result = new OrcStruct(type);
+        int c = 0;
+        for(TypeDescription child: type.getChildren()) {
+          result.setFieldValue(c++, createValue(child));
+        }
+        return result;
+      }
+      case UNION: return new OrcUnion(type);
+      case LIST: return new OrcList(type);
+      case MAP: return new OrcMap(type);
+      default:
+        throw new IllegalArgumentException("Unknown type " + type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
new file mode 100644
index 0000000..200a966
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcTimestamp.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Timestamp;
+
+/**
+ * A Timestamp implementation that implements Writable.
+ */
+public class OrcTimestamp extends Timestamp implements Writable {
+
+  public OrcTimestamp() {
+    super(0);
+  }
+
+  public OrcTimestamp(long time) {
+    super(time);
+  }
+
+  public OrcTimestamp(String timeStr) {
+    super(0);
+    Timestamp t = Timestamp.valueOf(timeStr);
+    setTime(t.getTime());
+    setNanos(t.getNanos());
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeLong(getTime());
+    output.writeInt(getNanos());
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    setTime(input.readLong());
+    setNanos(input.readInt());
+  }
+
+  public void set(String timeStr) {
+    Timestamp t = Timestamp.valueOf(timeStr);
+    setTime(t.getTime());
+    setNanos(t.getNanos());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java
new file mode 100644
index 0000000..3e7c909
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcUnion.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * An in-memory representation of a union type.
+ */
+public final class OrcUnion implements Writable {
+  private byte tag;
+  private Writable object;
+  private final TypeDescription schema;
+
+  public OrcUnion(TypeDescription schema) {
+    this.schema = schema;
+  }
+
+  public void set(byte tag, Writable object) {
+    this.tag = tag;
+    this.object = object;
+  }
+
+  public byte getTag() {
+    return tag;
+  }
+
+  public Writable getObject() {
+    return object;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || other.getClass() != OrcUnion.class) {
+      return false;
+    }
+    OrcUnion oth = (OrcUnion) other;
+    if (tag != oth.tag) {
+      return false;
+    } else if (object == null) {
+      return oth.object == null;
+    } else {
+      return object.equals(oth.object);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    int result = tag;
+    if (object != null) {
+      result ^= object.hashCode();
+    }
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "uniontype(" + Integer.toString(tag & 0xff) + ", " + object + ")";
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeByte(tag);
+    output.writeBoolean(object != null);
+    if (object != null) {
+      object.write(output);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    byte oldTag = tag;
+    tag = input.readByte();
+    if (input.readBoolean()) {
+      if (oldTag != tag || object == null) {
+        object = OrcStruct.createValue(schema.getChildren().get(tag));
+      }
+      object.readFields(input);
+    } else {
+      object = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java b/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java
new file mode 100644
index 0000000..c95c1f2
--- /dev/null
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/package-info.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>
+ * This package provides convenient access to ORC files using Hadoop's
+ * MapReduce InputFormat and OutputFormat.
+ * </p>
+ *
+ * <p>
+ * For reading, set the InputFormat to OrcInputFormat and your map will
+ * receive a stream of OrcStruct objects for each row. (Note that ORC files
+ * may have any type as the root object instead of structs and then the
+ * object type will be the appropriate one.)
+ * </p>
+ *
+ * <p>
+ * The mapping of types is:
+ * <table summary="Mapping of ORC types to Writable types"
+ *        border="1">
+ *   <thead>
+ *     <col width="25%">
+ *     <col width="75%">
+ *     <tr><th>ORC Type</th><th>Writable Type</th></tr>
+ *   </thead>
+ *   <tbody>
+ *     <tr><td>array</td><td>OrcList</td></tr>
+ *     <tr><td>binary</td><td>BytesWritable</td></tr>
+ *     <tr><td>bigint</td><td>LongWritable</td></tr>
+ *     <tr><td>boolean</td><td>BooleanWritable</td></tr>
+ *     <tr><td>char</td><td>Text</td></tr>
+ *     <tr><td>date</td><td>DateWritable</td></tr>
+ *     <tr><td>decimal</td><td>HiveDecimalWritable</td></tr>
+ *     <tr><td>double</td><td>DoubleWritable</td></tr>
+ *     <tr><td>float</td><td>FloatWritable</td></tr>
+ *     <tr><td>int</td><td>IntWritable</td></tr>
+ *     <tr><td>map</td><td>OrcMap</td></tr>
+ *     <tr><td>smallint</td><td>ShortWritable</td></tr>
+ *     <tr><td>string</td><td>Text</td></tr>
+ *     <tr><td>struct</td><td>OrcStruct</td></tr>
+ *     <tr><td>timestamp</td><td>OrcTimestamp</td></tr>
+ *     <tr><td>tinyint</td><td>ByteWritable</td></tr>
+ *     <tr><td>uniontype</td><td>OrcUnion</td></tr>
+ *     <tr><td>varchar</td><td>Text</td></tr>
+ *   </tbody>
+ * </table>
+ * </p>
+ *
+ * <p>
+ * For writing, set the OutputFormat to OrcOutputFormat and define the
+ * property "orc.schema" in your configuration. The property defines the
+ * type of the file and uses the Hive type strings, such as
+ * "struct&lt;x:int,y:string,z:timestamp&gt;" for a row with an integer,
+ * string, and timestamp. You can create an example object using:
+ *<pre>{@code
+ *String typeStr = "struct<x:int,y:string,z:timestamp>";
+ *OrcStruct row = (OrcStruct) OrcStruct.createValue(
+ *    TypeDescription.fromString(typeStr));
+ *}</pre>
+ * </p>
+ *
+ * <p>
+ * Please look at the OrcConf class for the configuration knobs that are
+ * available.
+ * </p>
+ */
+package org.apache.orc.mapred;

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java
new file mode 100644
index 0000000..4ac7ca9
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcList.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestOrcList {
+
+  static void cloneWritable(Writable source,
+                            Writable destination) throws IOException {
+    DataOutputBuffer out = new DataOutputBuffer(1024);
+    source.write(out);
+    out.flush();
+    DataInputBuffer in = new DataInputBuffer();
+    in.reset(out.getData(), out.getLength());
+    destination.readFields(in);
+  }
+
+  @Test
+  public void testRead() throws IOException {
+    TypeDescription type =
+        TypeDescription.createList(TypeDescription.createInt());
+    OrcList<IntWritable> expected = new OrcList<>(type);
+    OrcList<IntWritable> actual = new OrcList<>(type);
+    expected.add(new IntWritable(123));
+    expected.add(new IntWritable(456));
+    expected.add(new IntWritable(789));
+    assertNotEquals(expected, actual);
+    cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.clear();
+    cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.add(null);
+    expected.add(new IntWritable(500));
+    cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java
new file mode 100644
index 0000000..34e1feb
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcMap.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestOrcMap {
+
+  @Test
+  public void testRead() throws IOException {
+    TypeDescription type =
+        TypeDescription.createMap(TypeDescription.createInt(),
+            TypeDescription.createLong());
+    OrcMap<IntWritable, LongWritable> expected = new OrcMap<>(type);
+    OrcMap<IntWritable, LongWritable> actual = new OrcMap<>(type);
+    expected.put(new IntWritable(999), new LongWritable(1111));
+    expected.put(new IntWritable(888), new LongWritable(2222));
+    expected.put(new IntWritable(777), new LongWritable(3333));
+    assertNotEquals(expected, actual);
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.clear();
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.put(new IntWritable(666), null);
+    expected.put(new IntWritable(1), new LongWritable(777));
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
new file mode 100644
index 0000000..4af6a1c
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcStruct.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.TypeDescription;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestOrcStruct {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testRead() throws IOException {
+    TypeDescription type =
+        TypeDescription.createStruct()
+          .addField("f1", TypeDescription.createInt())
+          .addField("f2", TypeDescription.createLong())
+          .addField("f3", TypeDescription.createString());
+    OrcStruct expected = new OrcStruct(type);
+    OrcStruct actual = new OrcStruct(type);
+    assertEquals(3, expected.getNumFields());
+    expected.setFieldValue(0, new IntWritable(1));
+    expected.setFieldValue(1, new LongWritable(2));
+    expected.setFieldValue(2, new Text("wow"));
+    assertEquals(147710, expected.hashCode());
+    assertNotEquals(expected, actual);
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.setFieldValue(0, null);
+    expected.setFieldValue(1, null);
+    expected.setFieldValue(2, null);
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    assertEquals(3, expected.hashCode());
+    expected.setFieldValue(1, new LongWritable(111));
+    assertEquals(111, ((LongWritable) expected.getFieldValue(1)).get());
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testFieldAccess() {
+    OrcStruct struct = new OrcStruct(TypeDescription.fromString
+        ("struct<i:int,j:double,k:string>"));
+    struct.setFieldValue("j", new DoubleWritable(1.5));
+    struct.setFieldValue("k", new Text("Moria"));
+    struct.setFieldValue(0, new IntWritable(42));
+    assertEquals(new IntWritable(42), struct.getFieldValue("i"));
+    assertEquals(new DoubleWritable(1.5), struct.getFieldValue(1));
+    assertEquals(new Text("Moria"), struct.getFieldValue("k"));
+  }
+
+  @Test
+  public void testBadFieldRead() {
+    OrcStruct struct = new OrcStruct(TypeDescription.fromString
+        ("struct<i:int,j:double,k:string>"));
+    thrown.expect(IllegalArgumentException.class);
+    struct.getFieldValue("bad");
+  }
+
+  @Test
+  public void testBadFieldWrite() {
+    OrcStruct struct = new OrcStruct(TypeDescription.fromString
+        ("struct<i:int,j:double,k:string>"));
+    thrown.expect(IllegalArgumentException.class);
+    struct.setFieldValue("bad", new Text("foobar"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java
new file mode 100644
index 0000000..925eb8a
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcTimestamp.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestOrcTimestamp {
+
+  @Test
+  public void testRead() throws IOException {
+    OrcTimestamp expected = new OrcTimestamp("2016-04-01 12:34:56.9");
+    OrcTimestamp actual = new OrcTimestamp();
+    assertNotEquals(expected, actual);
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    assertEquals("2016-04-01 12:34:56.9", actual.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java
----------------------------------------------------------------------
diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java
new file mode 100644
index 0000000..82fd94f
--- /dev/null
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcUnion.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.mapred;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.TypeDescription;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestOrcUnion {
+
+  @Test
+  public void testRead() throws IOException {
+    TypeDescription type =
+        TypeDescription.fromString("uniontype<int,bigint,string>");
+    OrcUnion expected = new OrcUnion(type);
+    OrcUnion actual = new OrcUnion(type);
+    expected.set((byte) 2, new Text("foo"));
+    assertEquals(131367, expected.hashCode());
+    assertNotEquals(expected, actual);
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.set((byte) 0, new IntWritable(111));
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.set((byte)1, new LongWritable(4567));
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.set((byte) 1, new LongWritable(12345));
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+    expected.set((byte) 1, null);
+    TestOrcList.cloneWritable(expected, actual);
+    assertEquals(expected, actual);
+  }
+}