You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:07 UTC

[05/15] orc git commit: more updates

more updates


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/1752e172
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/1752e172
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/1752e172

Branch: refs/heads/orc-72
Commit: 1752e172a0f7b16fd193f75eb57d10ca05ab0b91
Parents: 5b37113
Author: Owen O'Malley <om...@apache.org>
Authored: Wed Oct 5 16:04:16 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Oct 10 13:59:16 2016 -0700

----------------------------------------------------------------------
 java/bench/pom.xml                              |  13 +
 .../orc/bench/ColumnProjectionBenchmark.java    |  10 +-
 .../org/apache/orc/bench/CompressionKind.java   |  56 ++++
 .../java/org/apache/orc/bench/GithubToAvro.java |   4 +-
 .../java/org/apache/orc/bench/GithubToJson.java |  21 +-
 .../java/org/apache/orc/bench/GithubToOrc.java  |   6 +-
 .../org/apache/orc/bench/GithubToParquet.java   |   2 +-
 .../java/org/apache/orc/bench/SalesToJson.java  |  16 +-
 .../java/org/apache/orc/bench/SalesToOrc.java   |   2 +-
 .../java/org/apache/orc/bench/TaxiToJson.java   |  64 +---
 .../java/org/apache/orc/bench/TaxiToOrc.java    |  63 +---
 .../java/org/apache/orc/bench/Utilities.java    |  86 ++++++
 .../org/apache/orc/bench/avro/AvroWriter.java   |  23 --
 .../org/apache/orc/bench/json/JsonWriter.java   |  69 +++++
 .../orc/bench/parquet/ConverterParent.java      |  24 ++
 .../bench/parquet/DataWritableReadSupport.java  | 200 +++++++++++++
 .../parquet/DataWritableRecordConverter.java    |  49 ++++
 .../bench/parquet/DataWritableWriteSupport.java |  61 ++++
 .../orc/bench/parquet/ETypeConverter.java       | 292 +++++++++++++++++++
 .../parquet/FilterPredicateLeafBuilder.java     |  80 +++++
 .../bench/parquet/HiveCollectionConverter.java  | 196 +++++++++++++
 .../orc/bench/parquet/HiveGroupConverter.java   |  79 +++++
 .../orc/bench/parquet/HiveSchemaConverter.java  | 140 +++++++++
 .../orc/bench/parquet/HiveStructConverter.java  | 192 ++++++++++++
 .../orc/bench/parquet/LeafFilterFactory.java    | 200 +++++++++++++
 .../parquet/MapredParquetOutputFormat.java      | 129 ++++++++
 .../org/apache/orc/bench/parquet/NanoTime.java  |  68 +++++
 .../apache/orc/bench/parquet/NanoTimeUtils.java | 113 +++++++
 .../ParquetFilterPredicateConverter.java        | 143 +++++++++
 .../parquet/ParquetRecordReaderWrapper.java     | 276 ++++++++++++++++++
 .../org/apache/orc/bench/parquet/Repeated.java  | 193 ++++++++++++
 java/pom.xml                                    |   5 +
 .../java/org/apache/orc/tools/PrintData.java    |   8 +-
 33 files changed, 2699 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/pom.xml
----------------------------------------------------------------------
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index f40f21b..caee888 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -37,6 +37,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
       <artifactId>orc-tools</artifactId>
     </dependency>
 
@@ -71,6 +75,10 @@
       <artifactId>parquet-hadoop</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.jodd</groupId>
+      <artifactId>jodd-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.openjdk.jmh</groupId>
       <artifactId>jmh-core</artifactId>
     </dependency>
@@ -89,6 +97,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
index 4b17819..c53911f 100644
--- a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
+++ b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.TrackingLocalFileSystem;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
@@ -40,6 +38,8 @@ import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.parquet.DataWritableReadSupport;
+import org.apache.orc.bench.parquet.ParquetRecordReaderWrapper;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.openjdk.jmh.annotations.AuxCounters;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -58,12 +58,9 @@ import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.GZIPInputStream;
 
 @BenchmarkMode(Mode.AverageTime)
 @Warmup(iterations=1, time=10, timeUnit = TimeUnit.SECONDS)
@@ -171,8 +168,7 @@ public class ColumnProjectionBenchmark {
     NullWritable nada = NullWritable.get();
     FileSplit split = new FileSplit(path, 0, Long.MAX_VALUE, new String[]{});
     org.apache.hadoop.mapred.RecordReader<NullWritable,ArrayWritable> recordReader =
-        new ParquetRecordReaderWrapper(inputFormat, split, conf,
-            Reporter.NULL);
+        new ParquetRecordReaderWrapper(inputFormat, split, conf);
     ArrayWritable value = recordReader.createValue();
     while (recordReader.next(nada, value)) {
       counters.records += 1;

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/CompressionKind.java b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
new file mode 100644
index 0000000..9fe9ba9
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/CompressionKind.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench;
+
+import io.airlift.compress.snappy.SnappyCodec;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Created by owen on 10/5/16.
+ */
+public enum CompressionKind {
+  NONE(""),
+  ZLIB(".gz"),
+  SNAPPY(".snappy");
+
+  CompressionKind(String extendsion) {
+    this.extension = extendsion;
+  }
+
+  private final String extension;
+
+  public String getExtension() {
+    return extension;
+  }
+
+  public OutputStream create(OutputStream out) throws IOException {
+    switch (this) {
+      case NONE:
+        return out;
+      case ZLIB:
+        return new GZIPOutputStream(out);
+      case SNAPPY:
+        return new SnappyCodec().createOutputStream(out);
+      default:
+        throw new IllegalArgumentException("Unhandled kind " + this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
index ee882e7..eb94ff2 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
@@ -28,12 +28,12 @@ import org.apache.orc.bench.json.JsonReader;
 public class GithubToAvro {
 
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("github.schema");
+    TypeDescription schema = Utilities.loadSchema("github.schema");
     Configuration conf = new Configuration();
     AvroWriter writer = new AvroWriter(new Path(args[0]), schema, conf,
         TaxiToAvro.getCodec(args[1]));
     VectorizedRowBatch batch = schema.createRowBatch();
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
+    for(String inFile: Utilities.sliceArray(args, 2)) {
       JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
       while (reader.nextBatch(batch)) {
         writer.writeBatch(batch);

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
index 1dd23de..cf6ca33 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
@@ -23,29 +23,22 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.json.JsonReader;
-import org.apache.orc.tools.FileDump;
-
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import org.apache.orc.bench.json.JsonWriter;
 
 public class GithubToJson {
 
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("github.schema");
+    TypeDescription schema = Utilities.loadSchema("github.schema");
     Path path = new Path(args[0]);
     VectorizedRowBatch batch = schema.createRowBatch();
     Configuration conf = new Configuration();
-    Writer output = new OutputStreamWriter(TaxiToJson.getCodec(args[1])
-        .create(path.getFileSystem(conf).create(path)));
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
-      JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
-      while (reader.nextBatch(batch)) {
-        for(int r=0; r < batch.size; ++r) {
-          FileDump.printRow(output, batch, schema, r);
-          output.write("\n");
+    try (JsonWriter writer = new JsonWriter(path, schema, conf, args[1])) {
+      for (String inFile : Utilities.sliceArray(args, 2)) {
+        JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
+        while (reader.nextBatch(batch)) {
+          writer.writeBatch(batch);
         }
       }
     }
-    output.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
index ebd6443..a04b08e 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
@@ -29,13 +29,13 @@ import org.apache.orc.bench.json.JsonReader;
 public class GithubToOrc {
 
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("github.schema");
+    TypeDescription schema = Utilities.loadSchema("github.schema");
     VectorizedRowBatch batch = schema.createRowBatch();
     Configuration conf = new Configuration();
     Writer writer = OrcFile.createWriter(new Path(args[0]),
         OrcFile.writerOptions(conf).setSchema(schema)
-            .compress(TaxiToOrc.getCodec(args[1])));
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
+            .compress(Utilities.getCodec(args[1])));
+    for(String inFile: Utilities.sliceArray(args, 2)) {
       JsonReader reader = new JsonReader(new Path(inFile), conf, schema);
       while (reader.nextBatch(batch)) {
         writer.addRowBatch(batch);

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
index db88c52..b1678aa 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
@@ -36,7 +36,7 @@ import java.util.Properties;
 public class GithubToParquet {
 
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("github.schema");
+    TypeDescription schema = Utilities.loadSchema("github.schema");
     VectorizedRowBatch batch = schema.createRowBatch();
     JobConf conf = new JobConf();
     conf.set("mapred.task.id", "attempt_0_0_m_0_0");

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/SalesToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToJson.java b/java/bench/src/java/org/apache/orc/bench/SalesToJson.java
index 500b6c9..3d51cc0 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToJson.java
@@ -22,11 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.tools.FileDump;
-
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.zip.GZIPOutputStream;
+import org.apache.orc.bench.json.JsonWriter;
 
 public class SalesToJson {
 
@@ -36,14 +32,10 @@ public class SalesToJson {
     Path path = new Path(args[0]);
     VectorizedRowBatch batch = schema.createRowBatch();
     Configuration conf = new Configuration();
-    Writer output = new OutputStreamWriter(TaxiToJson.getCodec(args[1])
-        .create(path.getFileSystem(conf).create(path)));
-    while (sales.nextBatch(batch)) {
-      for(int r=0; r < batch.size; ++r) {
-        FileDump.printRow(output, batch, schema, r);
-        output.write("\n");
+    try (JsonWriter output = new JsonWriter(path, schema, conf, args[1])) {
+      while (sales.nextBatch(batch)) {
+        output.writeBatch(batch);
       }
     }
-    output.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/SalesToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToOrc.java b/java/bench/src/java/org/apache/orc/bench/SalesToOrc.java
index 4e715b2..d3b2615 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToOrc.java
@@ -33,7 +33,7 @@ public class SalesToOrc {
     Writer writer = OrcFile.createWriter(new Path(args[0]),
         OrcFile.writerOptions(conf)
             .setSchema(sales.getSchema())
-            .compress(TaxiToOrc.getCodec(args[1])));
+            .compress(Utilities.getCodec(args[1])));
     while (sales.nextBatch(batch)) {
       writer.addRowBatch(batch);
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
index 4b8ca8c..8963230 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
@@ -23,72 +23,22 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.bench.csv.CsvReader;
-import org.apache.orc.tools.FileDump;
-import org.iq80.snappy.SnappyOutputStream;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.zip.GZIPOutputStream;
+import org.apache.orc.bench.json.JsonWriter;
 
 public class TaxiToJson {
 
-  public enum CompressionKind {
-    NONE(""),
-    ZLIB(".gz"),
-    SNAPPY(".snappy");
-
-    CompressionKind(String extendsion) {
-      this.extension = extendsion;
-    }
-
-    private final String extension;
-
-    public String getExtension() {
-      return extension;
-    }
-
-    public OutputStream create(OutputStream out) throws IOException {
-      switch (this) {
-        case NONE:
-          return out;
-        case ZLIB:
-          return new GZIPOutputStream(out);
-        case SNAPPY:
-          return new SnappyOutputStream(out);
-        default:
-          throw new IllegalArgumentException("Unhandled kind " + this);
-      }
-    }
-  }
-
-  public static CompressionKind getCodec(String name) {
-    if ("none".equals(name)) {
-      return CompressionKind.NONE;
-    } else if ("zlib".equals(name)) {
-      return CompressionKind.ZLIB;
-    } else if ("snappy".equals(name)) {
-      return CompressionKind.SNAPPY;
-    } throw new IllegalArgumentException("Unhnadled kind " + name);
-  }
-
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
+    TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema");
     Path path = new Path(args[0]);
     VectorizedRowBatch batch = schema.createRowBatch();
     Configuration conf = new Configuration();
-    Writer output = new OutputStreamWriter(getCodec(args[1])
-        .create(path.getFileSystem(conf).create(path)));
-    for(String inFile: TaxiToOrc.sliceArray(args, 2)) {
-      CsvReader reader = new CsvReader(new Path(inFile), conf, schema);
-      while (reader.nextBatch(batch)) {
-        for(int r=0; r < batch.size; ++r) {
-          FileDump.printRow(output, batch, schema, r);
-          output.write("\n");
+    try (JsonWriter output = new JsonWriter(path, schema, conf, args[1])) {
+      for(String inFile: Utilities.sliceArray(args, 2)) {
+        CsvReader taxi = new CsvReader(new Path(inFile), conf, schema);
+        while (taxi.nextBatch(batch)) {
+          output.writeBatch(batch);
         }
       }
     }
-    output.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
index 2588c72..e95f794 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
@@ -33,72 +33,15 @@ import java.util.Iterator;
 
 public class TaxiToOrc {
 
-  public static TypeDescription loadSchema(String name) throws IOException {
-    InputStream in = TaxiToOrc.class.getClassLoader().getResourceAsStream(name);
-    byte[] buffer= new byte[1 * 1024];
-    int len = in.read(buffer);
-    StringBuilder string = new StringBuilder();
-    while (len > 0) {
-      for(int i=0; i < len; ++i) {
-        // strip out
-        if (buffer[i] != '\n' && buffer[i] != ' ') {
-          string.append((char) buffer[i]);
-        }
-      }
-      len = in.read(buffer);
-    }
-    return TypeDescription.fromString(string.toString());
-  }
-
-  public static CompressionKind getCodec(String compression) {
-    if ("none".equals(compression)) {
-      return CompressionKind.NONE;
-    } else if ("zlib".equals(compression)) {
-      return CompressionKind.ZLIB;
-    } else if ("snappy".equals(compression)) {
-      return CompressionKind.SNAPPY;
-    } else {
-      throw new IllegalArgumentException("Unknown compression " + compression);
-    }
-  }
-
-  public static Iterable<String> sliceArray(final String[] array,
-                                            final int start) {
-    return new Iterable<String>() {
-      String[] values = array;
-      int posn = start;
-
-      @Override
-      public Iterator<String> iterator() {
-        return new Iterator<String>() {
-          @Override
-          public boolean hasNext() {
-            return posn < values.length;
-          }
-
-          @Override
-          public String next() {
-            return values[posn++];
-          }
-
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException("No remove");
-          }
-        };
-      }
-    };
-  }
-
   public static void main(String[] args) throws Exception {
-    TypeDescription schema = loadSchema("nyc-taxi.schema");
+    TypeDescription schema = Utilities.loadSchema("nyc-taxi.schema");
     VectorizedRowBatch batch = schema.createRowBatch();
     Configuration conf = new Configuration();
     Writer writer = OrcFile.createWriter(new Path(args[0]),
         OrcFile.writerOptions(conf)
             .setSchema(schema)
-            .compress(getCodec(args[1])));
-    for(String inFile: sliceArray(args, 2)) {
+            .compress(Utilities.getCodec(args[1])));
+    for(String inFile: Utilities.sliceArray(args, 2)) {
       CsvReader reader = new CsvReader(new Path(inFile), conf, schema);
       while (reader.nextBatch(batch)) {
         writer.addRowBatch(batch);

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/Utilities.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/Utilities.java b/java/bench/src/java/org/apache/orc/bench/Utilities.java
new file mode 100644
index 0000000..9a95ae9
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/Utilities.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench;
+
+import org.apache.orc.CompressionKind;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+public class Utilities {
+
+  public static TypeDescription loadSchema(String name) throws IOException {
+    InputStream in = Utilities.class.getClassLoader().getResourceAsStream(name);
+    byte[] buffer= new byte[1 * 1024];
+    int len = in.read(buffer);
+    StringBuilder string = new StringBuilder();
+    while (len > 0) {
+      for(int i=0; i < len; ++i) {
+        // strip out
+        if (buffer[i] != '\n' && buffer[i] != ' ') {
+          string.append((char) buffer[i]);
+        }
+      }
+      len = in.read(buffer);
+    }
+    return TypeDescription.fromString(string.toString());
+  }
+
+  public static CompressionKind getCodec(String compression) {
+    if ("none".equals(compression)) {
+      return CompressionKind.NONE;
+    } else if ("zlib".equals(compression)) {
+      return CompressionKind.ZLIB;
+    } else if ("snappy".equals(compression)) {
+      return CompressionKind.SNAPPY;
+    } else {
+      throw new IllegalArgumentException("Unknown compression " + compression);
+    }
+  }
+
+  public static Iterable<String> sliceArray(final String[] array,
+                                            final int start) {
+    return new Iterable<String>() {
+      String[] values = array;
+      int posn = start;
+
+      @Override
+      public Iterator<String> iterator() {
+        return new Iterator<String>() {
+          @Override
+          public boolean hasNext() {
+            return posn < values.length;
+          }
+
+          @Override
+          public String next() {
+            return values[posn++];
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException("No remove");
+          }
+        };
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
index f9d3bad..2735a71 100644
--- a/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
@@ -46,29 +46,6 @@ import java.util.Properties;
 
 public class AvroWriter {
 
-  static Properties setHiveSchema(TypeDescription schema) {
-    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
-      throw new IllegalArgumentException("Assumes struct type as root, not " +
-          schema);
-    }
-    StringBuilder fieldNames = new StringBuilder();
-    StringBuilder fieldTypes = new StringBuilder();
-    List<String> childNames = schema.getFieldNames();
-    List<TypeDescription> childTypes = schema.getChildren();
-    for(int f=0; f < childNames.size(); ++f) {
-      if (f != 0) {
-        fieldNames.append(',');
-        fieldTypes.append(',');
-      }
-      fieldNames.append(childNames.get(f));
-      fieldTypes.append(childTypes.get(f).toString());
-    }
-    Properties properties = new Properties();
-    properties.put("columns", fieldNames.toString());
-    properties.put("columns.types", fieldTypes.toString());
-    return properties;
-  }
-
   interface AvroConverter {
     Object convert(ColumnVector vector, int row);
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/json/JsonWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/json/JsonWriter.java b/java/bench/src/java/org/apache/orc/bench/json/JsonWriter.java
new file mode 100644
index 0000000..9f03197
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/json/JsonWriter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.json;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.CompressionKind;
+import org.apache.orc.tools.PrintData;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONWriter;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+public class JsonWriter implements Closeable {
+  private final Writer outStream;
+  private final JSONWriter writer;
+  private final TypeDescription schema;
+
+  public JsonWriter(Path path, TypeDescription schema,
+                    Configuration conf,
+                    String compression) throws IOException {
+    CompressionKind codec = CompressionKind.valueOf(compression);
+    path = path.suffix(".jsn" + codec.getExtension());
+    OutputStream file = path.getFileSystem(conf).create(path, true);
+    outStream = new OutputStreamWriter(codec.create(file));
+    writer = new JSONWriter(outStream);
+    this.schema = schema;
+  }
+
+  public void writeBatch(VectorizedRowBatch batch) throws IOException {
+    try {
+      for (int r = 0; r < batch.size; ++r) {
+        for (int f = 0; f < batch.cols.length; ++f) {
+          PrintData.printRow(writer, batch, schema, r);
+
+        }
+        outStream.append('\n');
+      }
+    } catch (JSONException je) {
+      throw new IOException("json problem", je);
+    }
+  }
+
+  public void close() throws IOException {
+    outStream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/ConverterParent.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ConverterParent.java b/java/bench/src/java/org/apache/orc/bench/parquet/ConverterParent.java
new file mode 100644
index 0000000..afd6b76
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/ConverterParent.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import org.apache.hadoop.io.Writable;
+
+import java.util.Map;
+
+interface ConverterParent {
+  void set(int index, Writable value);
+
+  Map<String, String> getMetadata();
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
new file mode 100644
index 0000000..0bcce1f
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableReadSupport.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+/**
+ *
+ * A MapWritableReadSupport
+ *
+ * Manages the translation between Hive and Parquet
+ *
+ */
+public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
+
+  public static final String HIVE_TABLE_AS_PARQUET_SCHEMA = "HIVE_TABLE_SCHEMA";
+  public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access";
+  private TypeDescription hiveTypeInfo;
+
+  /**
+   * Searchs for a fieldName into a parquet GroupType by ignoring string case.
+   * GroupType#getType(String fieldName) is case sensitive, so we use this method.
+   *
+   * @param groupType Group of field types where to search for fieldName
+   * @param fieldName The field what we are searching
+   * @return The Type object of the field found; null otherwise.
+   */
+  private static Type getFieldTypeIgnoreCase(GroupType groupType, String fieldName) {
+    for (Type type : groupType.getFields()) {
+      if (type.getName().equalsIgnoreCase(fieldName)) {
+        return type;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Searchs column names by name on a given Parquet schema, and returns its corresponded
+   * Parquet schema types.
+   *
+   * @param schema Group schema where to search for column names.
+   * @param colTypes List of column types.
+   * @return List of GroupType objects of projected columns.
+   */
+  private static List<Type> getProjectedGroupFields(GroupType schema,
+                                                    TypeDescription colTypes) {
+    List<Type> schemaTypes = new ArrayList<Type>();
+    List<String> fieldNames = colTypes.getFieldNames();
+    List<TypeDescription> fieldTypes = colTypes.getChildren();
+
+    for(int i=0; i < fieldNames.size(); ++i) {
+      TypeDescription colType = fieldTypes.get(i);
+      String colName = fieldNames.get(i);
+
+      Type fieldType = getFieldTypeIgnoreCase(schema, colName);
+      if (fieldType == null) {
+        schemaTypes.add(Types.optional(PrimitiveTypeName.BINARY).named(colName));
+      } else {
+        schemaTypes.add(getProjectedType(colType, fieldType));
+      }
+    }
+
+    return schemaTypes;
+  }
+
+  private static Type getProjectedType(TypeDescription colType, Type fieldType) {
+    switch (colType.getCategory()) {
+      case STRUCT:
+        List<Type> groupFields = getProjectedGroupFields(
+          fieldType.asGroupType(),
+          colType
+        );
+  
+        Type[] typesArray = groupFields.toArray(new Type[0]);
+        return Types.buildGroup(fieldType.getRepetition())
+          .addFields(typesArray)
+          .named(fieldType.getName());
+      case LIST:
+        TypeDescription elemType = colType.getChildren().get(0);
+        if (elemType.getCategory() == TypeDescription.Category.STRUCT) {
+          Type subFieldType = fieldType.asGroupType().getType(0);
+          if (!subFieldType.isPrimitive()) {
+            String subFieldName = subFieldType.getName();
+            Text name = new Text(subFieldName);
+            if (name.equals("array") || name.equals("list")) {
+              subFieldType = new GroupType(Repetition.REPEATED, subFieldName,
+                getProjectedType(elemType, subFieldType.asGroupType().getType(0)));
+            } else {
+              subFieldType = getProjectedType(elemType, subFieldType);
+            }
+            return Types.buildGroup(Repetition.OPTIONAL).as(OriginalType.LIST).addFields(
+              subFieldType).named(fieldType.getName());
+          }
+        }
+        break;
+      default:
+    }
+    return fieldType;
+  }
+
+  /**
+   * Searchs column names by name on a given Parquet message schema, and returns its projected
+   * Parquet schema types.
+   *
+   * @param schema Message type schema where to search for column names.
+   * @param colTypes The reader schema
+   * @return A MessageType object of projected columns.
+   */
+  private static MessageType getSchemaByName(MessageType schema,
+                                             TypeDescription colTypes) {
+    List<Type> projectedFields = getProjectedGroupFields(schema, colTypes);
+    Type[] typesArray = projectedFields.toArray(new Type[0]);
+
+    return Types.buildMessage()
+        .addFields(typesArray)
+        .named(schema.getName());
+  }
+
+  /**
+   * It creates the readContext for Parquet side with the requested schema during the init phase.
+   *
+   * @param context
+   * @return the parquet ReadContext
+   */
+  @Override
+  public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) {
+    Configuration configuration = context.getConfiguration();
+    MessageType fileSchema = context.getFileSchema();
+    Map<String, String> contextMetadata = new HashMap<String, String>();
+    String readerSchemaString = configuration.get("reader.schema");
+
+    if (readerSchemaString != null) {
+      this.hiveTypeInfo = TypeDescription.fromString(readerSchemaString);
+
+      MessageType tableSchema = getSchemaByName(fileSchema, hiveTypeInfo);
+
+      contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, tableSchema.toString());
+      contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(false));
+
+      return new ReadContext(tableSchema, contextMetadata);
+    } else {
+      contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, fileSchema.toString());
+      return new ReadContext(fileSchema, contextMetadata);
+    }
+  }
+
+  /**
+   *
+   * It creates the hive read support to interpret data from parquet to hive
+   *
+   * @param configuration // unused
+   * @param keyValueMetaData
+   * @param fileSchema // unused
+   * @param readContext containing the requested schema and the schema of the hive table
+   * @return Record Materialize for Hive
+   */
+  @Override
+  public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration configuration,
+      final Map<String, String> keyValueMetaData, final MessageType fileSchema,
+          final org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+    final Map<String, String> metadata = readContext.getReadSupportMetadata();
+    if (metadata == null) {
+      throw new IllegalStateException("ReadContext not initialized properly. " +
+        "Don't know the Hive Schema.");
+    }
+    return new DataWritableRecordConverter(readContext.getRequestedSchema(), metadata, hiveTypeInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableRecordConverter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableRecordConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableRecordConverter.java
new file mode 100644
index 0000000..9b1f4e5
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableRecordConverter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.util.Map;
+
+/**
+ *
+ * A MapWritableReadSupport, encapsulates the tuples
+ *
+ */
+public class DataWritableRecordConverter extends RecordMaterializer<ArrayWritable> {
+
+  private final HiveStructConverter root;
+
+  public DataWritableRecordConverter(final GroupType requestedSchema, final Map<String, String> metadata, TypeDescription hiveTypeInfo) {
+    this.root = new HiveStructConverter(requestedSchema,
+      MessageTypeParser.parseMessageType(metadata.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)),
+        metadata, hiveTypeInfo);
+  }
+
+  @Override
+  public ArrayWritable getCurrentRecord() {
+    return root.getCurrentArray();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
new file mode 100644
index 0000000..f4621e5
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/DataWritableWriteSupport.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.write;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+/**
+ *
+ * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
+ *
+ */
+public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
+
+  public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
+
+  private DataWritableWriter writer;
+  private MessageType schema;
+
+  public static void setSchema(final MessageType schema, final Configuration configuration) {
+    configuration.set(PARQUET_HIVE_SCHEMA, schema.toString());
+  }
+
+  public static MessageType getSchema(final Configuration configuration) {
+    return MessageTypeParser.parseMessageType(configuration.get(PARQUET_HIVE_SCHEMA));
+  }
+
+  @Override
+  public WriteContext init(final Configuration configuration) {
+    schema = getSchema(configuration);
+    return new WriteContext(schema, new HashMap<String, String>());
+  }
+
+  @Override
+  public void prepareForWrite(final RecordConsumer recordConsumer) {
+    writer = new DataWritableWriter(recordConsumer, schema);
+  }
+
+  @Override
+  public void write(final ParquetHiveRecord record) {
+    writer.write(record);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/ETypeConverter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ETypeConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/ETypeConverter.java
new file mode 100644
index 0000000..56dcfe7
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/ETypeConverter.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcTimestamp;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ *
+ * ETypeConverter is an easy way to set the converter for the right type.
+ *
+ */
+public enum ETypeConverter {
+
+  EDOUBLE_CONVERTER(Double.TYPE) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new PrimitiveConverter() {
+        @Override
+        public void addDouble(final double value) {
+          parent.set(index, new DoubleWritable(value));
+        }
+      };
+    }
+  },
+  EBOOLEAN_CONVERTER(Boolean.TYPE) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new PrimitiveConverter() {
+        @Override
+        public void addBoolean(final boolean value) {
+          parent.set(index, new BooleanWritable(value));
+        }
+      };
+    }
+  },
+  EFLOAT_CONVERTER(Float.TYPE) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      if (hiveTypeInfo != null && hiveTypeInfo.getCategory() == TypeDescription.Category.DOUBLE) {
+        return new PrimitiveConverter() {
+          @Override
+          public void addFloat(final float value) {
+            parent.set(index, new DoubleWritable((double) value));
+          }
+        };
+      } else {
+        return new PrimitiveConverter() {
+          @Override
+          public void addFloat(final float value) {
+            parent.set(index, new FloatWritable(value));
+          }
+        };
+      }
+    }
+  },
+  EINT32_CONVERTER(Integer.TYPE) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index,
+        final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      if (hiveTypeInfo != null) {
+        switch (hiveTypeInfo.getCategory()) {
+          case LONG:
+          return new PrimitiveConverter() {
+            @Override
+            public void addInt(final int value) {
+              parent.set(index, new LongWritable((long) value));
+            }
+          };
+          case FLOAT:
+          return new PrimitiveConverter() {
+            @Override
+            public void addInt(final int value) {
+              parent.set(index, new FloatWritable((float) value));
+            }
+          };
+          case DOUBLE:
+          return new PrimitiveConverter() {
+            @Override
+            public void addInt(final int value) {
+              parent.set(index, new DoubleWritable((float) value));
+            }
+          };
+        }
+      }
+      return new PrimitiveConverter() {
+        @Override
+        public void addInt(final int value) {
+          parent.set(index, new IntWritable(value));
+        }
+      };
+    }
+  },
+  EINT64_CONVERTER(Long.TYPE) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      if(hiveTypeInfo != null) {
+        switch(hiveTypeInfo.getCategory()) {
+          case FLOAT:
+          return new PrimitiveConverter() {
+            @Override
+            public void addLong(final long value) {
+              parent.set(index, new FloatWritable(value));
+            }
+          };
+          case DOUBLE:
+          return new PrimitiveConverter() {
+            @Override
+            public void addLong(final long value) {
+              parent.set(index, new DoubleWritable(value));
+            }
+          };
+        }
+      }
+      return new PrimitiveConverter() {
+        @Override
+        public void addLong(final long value) {
+          parent.set(index, new LongWritable(value));
+        }
+      };
+    }
+  },
+  EBINARY_CONVERTER(Binary.class) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new BinaryConverter<BytesWritable>(type, parent, index) {
+        @Override
+        protected BytesWritable convert(Binary binary) {
+          return new BytesWritable(binary.getBytes());
+        }
+      };
+    }
+  },
+  ESTRING_CONVERTER(String.class) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new BinaryConverter<Text>(type, parent, index) {
+        @Override
+        protected Text convert(Binary binary) {
+          return new Text(binary.getBytes());
+        }
+      };
+    }
+  },
+  EDECIMAL_CONVERTER(BigDecimal.class) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new BinaryConverter<HiveDecimalWritable>(type, parent, index) {
+        @Override
+        protected HiveDecimalWritable convert(Binary binary) {
+          return new HiveDecimalWritable(binary.getBytes(), type.getDecimalMetadata().getScale());
+        }
+      };
+    }
+  },
+  ETIMESTAMP_CONVERTER(OrcTimestamp.class) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new BinaryConverter<OrcTimestamp>(type, parent, index) {
+        @Override
+        protected OrcTimestamp convert(Binary binary) {
+          NanoTime nt = NanoTime.fromBinary(binary);
+          Map<String, String> metadata = parent.getMetadata();
+          //Current Hive parquet timestamp implementation stores it in UTC, but other components do not do that.
+          //If this file written by current Hive implementation itself, we need to do the reverse conversion, else skip the conversion.
+          boolean skipConversion = true;
+          Timestamp ts = NanoTimeUtils.getTimestamp(nt, skipConversion);
+          return new OrcTimestamp(ts.getTime());
+        }
+      };
+    }
+  },
+  EDATE_CONVERTER(DateWritable.class) {
+    @Override
+    PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo) {
+      return new PrimitiveConverter() {
+        @Override
+        public void addInt(final int value) {
+          parent.set(index, new DateWritable(value));
+        }
+      };
+    }
+  };
+
+  final Class<?> _type;
+
+  private ETypeConverter(final Class<?> type) {
+    this._type = type;
+  }
+
+  private Class<?> getType() {
+    return _type;
+  }
+
+  abstract PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent, TypeDescription hiveTypeInfo);
+
+  public static PrimitiveConverter getNewConverter(final PrimitiveType type, final int index,
+                                                   final ConverterParent parent, TypeDescription hiveTypeInfo) {
+    if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) {
+      //TODO- cleanup once parquet support Timestamp type annotation.
+      return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent, hiveTypeInfo);
+    }
+    if (OriginalType.DECIMAL == type.getOriginalType()) {
+      return EDECIMAL_CONVERTER.getConverter(type, index, parent, hiveTypeInfo);
+    } else if (OriginalType.UTF8 == type.getOriginalType()) {
+      return ESTRING_CONVERTER.getConverter(type, index, parent, hiveTypeInfo);
+    } else if (OriginalType.DATE == type.getOriginalType()) {
+      return EDATE_CONVERTER.getConverter(type, index, parent, hiveTypeInfo);
+    }
+
+    Class<?> javaType = type.getPrimitiveTypeName().javaType;
+    for (final ETypeConverter eConverter : values()) {
+      if (eConverter.getType() == javaType) {
+        return eConverter.getConverter(type, index, parent, hiveTypeInfo);
+      }
+    }
+
+    throw new IllegalArgumentException("Converter not found ... for type : " + type);
+  }
+
+  public abstract static class BinaryConverter<T extends Writable> extends PrimitiveConverter {
+    protected final PrimitiveType type;
+    private final ConverterParent parent;
+    private final int index;
+    private ArrayList<T> lookupTable;
+
+    public BinaryConverter(PrimitiveType type, ConverterParent parent, int index) {
+      this.type = type;
+      this.parent = parent;
+      this.index = index;
+    }
+
+    protected abstract T convert(Binary binary);
+
+    @Override
+    public boolean hasDictionarySupport() {
+      return true;
+    }
+
+    @Override
+    public void setDictionary(Dictionary dictionary) {
+      int length = dictionary.getMaxId() + 1;
+      lookupTable = new ArrayList<T>();
+      for (int i = 0; i < length; i++) {
+        lookupTable.add(convert(dictionary.decodeToBinary(i)));
+      }
+    }
+
+    @Override
+    public void addValueFromDictionary(int dictionaryId) {
+      parent.set(index, lookupTable.get(dictionaryId));
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      parent.set(index, convert(value));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/FilterPredicateLeafBuilder.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/FilterPredicateLeafBuilder.java b/java/bench/src/java/org/apache/orc/bench/parquet/FilterPredicateLeafBuilder.java
new file mode 100644
index 0000000..958c439
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/FilterPredicateLeafBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+
+/**
+ * The base class for building parquet supported filter predicate in primary types.
+ */
+public abstract class FilterPredicateLeafBuilder {
+
+  /**
+   * Build filter predicate with multiple constants
+   *
+   * @param op         IN or BETWEEN
+   * @param literals
+   * @param columnName
+   * @return
+   */
+  public FilterPredicate buildPredicate(PredicateLeaf.Operator op, List<Object> literals,
+                                        String columnName) throws Exception {
+    FilterPredicate result = null;
+    switch (op) {
+      case IN:
+        for (Object literal : literals) {
+          if (result == null) {
+            result = buildPredict(PredicateLeaf.Operator.EQUALS, literal, columnName);
+          } else {
+            result = or(result, buildPredict(PredicateLeaf.Operator.EQUALS, literal,
+                columnName));
+          }
+        }
+        return result;
+      case BETWEEN:
+        if (literals.size() != 2) {
+          throw new RuntimeException(
+            "Not able to build 'between' operation filter with " + literals +
+              " which needs two literals");
+        }
+        Object min = literals.get(0);
+        Object max = literals.get(1);
+        FilterPredicate lt = not(buildPredict(PredicateLeaf.Operator.LESS_THAN,
+            min, columnName));
+        FilterPredicate gt = buildPredict(PredicateLeaf.Operator.LESS_THAN_EQUALS, max, columnName);
+        result = FilterApi.and(gt, lt);
+        return result;
+      default:
+        throw new RuntimeException("Unknown PredicateLeaf Operator type: " + op);
+    }
+  }
+
+  /**
+   * Build predicate with a single constant
+   *
+   * @param op         EQUALS, NULL_SAFE_EQUALS, LESS_THAN, LESS_THAN_EQUALS, IS_NULL
+   * @param constant
+   * @param columnName
+   * @return null or a FilterPredicate, null means no filter will be executed
+   */
+  public abstract FilterPredicate buildPredict(PredicateLeaf.Operator op, Object constant,
+                                               String columnName) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
new file mode 100644
index 0000000..a8834aa
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/HiveCollectionConverter.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.parquet;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
+
+public class HiveCollectionConverter extends HiveGroupConverter {
+  private final GroupType collectionType;
+  private final ConverterParent parent;
+  private final int index;
+  private final Converter innerConverter;
+  private final List<Writable> list = new ArrayList<Writable>();
+
+  public static HiveGroupConverter forMap(GroupType mapType,
+                                          ConverterParent parent,
+                                          int index, TypeDescription hiveTypeInfo) {
+    return new HiveCollectionConverter(
+        mapType, parent, index, true /* its a map */, hiveTypeInfo );
+  }
+
+  public static HiveGroupConverter forList(GroupType listType,
+                                           ConverterParent parent,
+                                           int index, TypeDescription hiveTypeInfo) {
+    return new HiveCollectionConverter(
+      listType, parent, index, false /* nUnknown hive type infoot a map */, hiveTypeInfo);
+  }
+
+  private HiveCollectionConverter(GroupType collectionType,
+                                  ConverterParent parent,
+                                  int index, boolean isMap, TypeDescription hiveTypeInfo) {
+    setMetadata(parent.getMetadata());
+    this.collectionType = collectionType;
+    this.parent = parent;
+    this.index = index;
+    Type repeatedType = collectionType.getType(0);
+    if (isMap) {
+      this.innerConverter = new KeyValueConverter(
+          repeatedType.asGroupType(), this, hiveTypeInfo);
+    } else if (isElementType(repeatedType, collectionType.getName())) {
+      this.innerConverter = getConverterFromDescription(repeatedType, 0, this, extractListCompatibleType(hiveTypeInfo));
+    } else {
+      this.innerConverter = new ElementConverter(
+          repeatedType.asGroupType(), this,  extractListCompatibleType(hiveTypeInfo));
+    }
+  }
+
+  private TypeDescription extractListCompatibleType(TypeDescription hiveTypeInfo) {
+    if (hiveTypeInfo !=  null && hiveTypeInfo.getCategory() == TypeDescription.Category.LIST) {
+      return hiveTypeInfo.getChildren().get(0);
+    } else {
+      return hiveTypeInfo; //to handle map can read list of struct data (i.e. list<struct<key, value>> --> map<key,
+      // value>)
+    }
+  }
+
+  @Override
+  public Converter getConverter(int fieldIndex) {
+    Preconditions.checkArgument(
+        fieldIndex == 0, "Invalid field index: " + fieldIndex);
+    return innerConverter;
+  }
+
+  @Override
+  public void start() {
+    list.clear();
+  }
+
+  @Override
+  public void end() {
+    parent.set(index, new ArrayWritable(
+        Writable.class, list.toArray(new Writable[0])));
+  }
+
+  @Override
+  public void set(int index, Writable value) {
+    list.add(value);
+  }
+
+  private static class KeyValueConverter extends HiveGroupConverter {
+    private final HiveGroupConverter parent;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private Writable[] keyValue = null;
+
+    public KeyValueConverter(GroupType keyValueType, HiveGroupConverter parent, TypeDescription hiveTypeInfo) {
+      setMetadata(parent.getMetadata());
+      this.parent = parent;
+      this.keyConverter = getConverterFromDescription(
+          keyValueType.getType(0), 0, this, hiveTypeInfo == null ? null : hiveTypeInfo.getChildren().get(0));
+      this.valueConverter = getConverterFromDescription(
+          keyValueType.getType(1), 1, this, hiveTypeInfo == null ? null : hiveTypeInfo.getChildren().get(1));
+    }
+
+    @Override
+    public void set(int fieldIndex, Writable value) {
+      keyValue[fieldIndex] = value;
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      switch (fieldIndex) {
+        case 0:
+          return keyConverter;
+        case 1:
+          return valueConverter;
+        default:
+          throw new IllegalArgumentException(
+              "Invalid field index for map key-value: " + fieldIndex);
+      }
+    }
+
+    @Override
+    public void start() {
+      this.keyValue = new Writable[2];
+    }
+
+    @Override
+    public void end() {
+      parent.set(0, new ArrayWritable(Writable.class, keyValue));
+    }
+  }
+
+  private static class ElementConverter extends HiveGroupConverter {
+    private final HiveGroupConverter parent;
+    private final Converter elementConverter;
+    private Writable element = null;
+
+    public ElementConverter(GroupType repeatedType, HiveGroupConverter parent, TypeDescription hiveTypeInfo) {
+      setMetadata(parent.getMetadata());
+      this.parent = parent;
+      this.elementConverter = getConverterFromDescription(
+          repeatedType.getType(0), 0, this, hiveTypeInfo);
+    }
+
+    @Override
+    public void set(int index, Writable value) {
+      this.element = value;
+    }
+
+    @Override
+    public Converter getConverter(int i) {
+      return elementConverter;
+    }
+
+    @Override
+    public void start() {
+      this.element = null;
+    }
+
+    @Override
+    public void end() {
+      parent.set(0, element);
+    }
+  }
+
+  private static boolean isElementType(Type repeatedType, String parentName) {
+    if (repeatedType.isPrimitive() ||
+        (repeatedType.asGroupType().getFieldCount() != 1)) {
+      return true;
+    } else if (repeatedType.getName().equals("array")) {
+      return true; // existing avro data
+    } else if (repeatedType.getName().equals(parentName + "_tuple")) {
+      return true; // existing thrift data
+    }
+    // false for the following cases:
+    // * name is "list", which matches the spec
+    // * name is "bag", which indicates existing hive or pig data
+    // * ambiguous case, which should be assumed is 3-level according to spec
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/HiveGroupConverter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/HiveGroupConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/HiveGroupConverter.java
new file mode 100644
index 0000000..d41fb7e
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/HiveGroupConverter.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.util.Map;
+
+public abstract class HiveGroupConverter extends GroupConverter implements ConverterParent {
+
+  private Map<String, String> metadata;
+
+  public void setMetadata(Map<String, String> metadata) {
+    this.metadata = metadata;
+  }
+
+  public Map<String, String> getMetadata() {
+    return metadata;
+  }
+
+  protected static PrimitiveConverter getConverterFromDescription(PrimitiveType type, int index, ConverterParent
+      parent, TypeDescription hiveTypeInfo) {
+    if (type == null) {
+      return null;
+    }
+
+    return ETypeConverter.getNewConverter(type, index, parent, hiveTypeInfo);
+  }
+
+  protected static HiveGroupConverter getConverterFromDescription(GroupType type, int index, ConverterParent parent,
+                                                                  TypeDescription hiveTypeInfo) {
+    if (type == null) {
+      return null;
+    }
+
+    OriginalType annotation = type.getOriginalType();
+    if (annotation == OriginalType.LIST) {
+      return HiveCollectionConverter.forList(type, parent, index, hiveTypeInfo);
+    } else if (annotation == OriginalType.MAP || annotation == OriginalType.MAP_KEY_VALUE) {
+      return HiveCollectionConverter.forMap(type, parent, index, hiveTypeInfo);
+    }
+
+    return new HiveStructConverter(type, parent, index, hiveTypeInfo);
+  }
+
+  protected static Converter getConverterFromDescription(Type type, int index, ConverterParent parent, TypeDescription hiveTypeInfo) {
+    if (type == null) {
+      return null;
+    }
+
+    if (type.isPrimitive()) {
+      return getConverterFromDescription(type.asPrimitiveType(), index, parent, hiveTypeInfo);
+    }
+
+    return getConverterFromDescription(type.asGroupType(), index, parent, hiveTypeInfo);
+  }
+
+  public abstract void set(int index, Writable value);
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/HiveSchemaConverter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/HiveSchemaConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/HiveSchemaConverter.java
new file mode 100644
index 0000000..7243fb6
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/HiveSchemaConverter.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.util.List;
+
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+public class HiveSchemaConverter {
+
+  // Map precision to the number bytes needed for binary conversion.
+  static final int PRECISION_TO_BYTE_COUNT[] = new int[38];
+  static {
+    for (int prec = 1; prec <= 38; prec++) {
+      // Estimated number of bytes needed.
+      PRECISION_TO_BYTE_COUNT[prec - 1] = (int)
+          Math.ceil((Math.log(Math.pow(10, prec) - 1) / Math.log(2) + 1) / 8);
+    }
+  }
+
+  public static MessageType convert(TypeDescription hiveType) {
+    final MessageType schema = new MessageType("hive_schema", convertTypes(hiveType.getFieldNames(), hiveType.getChildren()));
+    return schema;
+  }
+
+  private static Type[] convertTypes(final List<String> columnNames, final List<TypeDescription> columnTypes) {
+    if (columnNames.size() != columnTypes.size()) {
+      throw new IllegalStateException("Mismatched Hive columns and types. Hive columns names" +
+        " found : " + columnNames + " . And Hive types found : " + columnTypes);
+    }
+    final Type[] types = new Type[columnNames.size()];
+    for (int i = 0; i < columnNames.size(); ++i) {
+      types[i] = convertType(columnNames.get(i), columnTypes.get(i));
+    }
+    return types;
+  }
+
+  private static Type convertType(final String name, final TypeDescription typeInfo) {
+    return convertType(name, typeInfo, Repetition.OPTIONAL);
+  }
+
+  private static Type convertType(final String name,
+                                  final TypeDescription typeInfo,
+                                  final Repetition repetition) {
+    switch (typeInfo.getCategory()) {
+      case STRING:
+        return Types.primitive(PrimitiveTypeName.BINARY, repetition)
+            .as(OriginalType.UTF8)
+            .named(name);
+      case BYTE:
+        return Types.primitive(PrimitiveTypeName.INT32, repetition).named(name);
+      case SHORT:
+        return Types.primitive(PrimitiveTypeName.INT32, repetition).named(name);
+      case INT:
+        return Types.primitive(PrimitiveTypeName.INT32, repetition).named(name);
+      case LONG:
+        return Types.primitive(PrimitiveTypeName.INT64, repetition).named(name);
+      case FLOAT:
+        return Types.primitive(PrimitiveTypeName.FLOAT, repetition).named(name);
+      case DOUBLE:
+        return Types.primitive(PrimitiveTypeName.DOUBLE, repetition).named(name);
+      case BOOLEAN:
+        return Types.primitive(PrimitiveTypeName.BOOLEAN, repetition).named(name);
+      case TIMESTAMP:
+        return Types.primitive(PrimitiveTypeName.INT96, repetition).named(name);
+      case BINARY:
+        return Types.primitive(PrimitiveTypeName.BINARY, repetition).named(name);
+      case CHAR:
+        return Types.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+            .named(name);
+      case VARCHAR:
+        return Types.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
+            .named(name);
+      case DECIMAL: {
+        int prec = typeInfo.getPrecision();
+        int scale = typeInfo.getScale();
+        int bytes = PRECISION_TO_BYTE_COUNT[prec - 1];
+        return Types.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(bytes)
+            .as(OriginalType.DECIMAL).
+            scale(scale).precision(prec).named(name);
+      }
+      case DATE:
+        return Types.primitive(PrimitiveTypeName.INT32, repetition)
+            .as(OriginalType.DATE).named(name);
+      case LIST:
+        return convertArrayType(name, typeInfo);
+      case MAP:
+        return convertMapType(name, typeInfo);
+      case STRUCT:
+        return convertStructType(name, typeInfo);
+      default:
+        throw new IllegalArgumentException("Unimplemented type " + typeInfo);
+
+    }
+  }
+
+  // An optional group containing a repeated anonymous group "bag", containing
+  // 1 anonymous element "array_element"
+  @SuppressWarnings("deprecation")
+  private static GroupType convertArrayType(final String name, final TypeDescription typeInfo) {
+    return new GroupType(Repetition.OPTIONAL, name, OriginalType.LIST,
+        new GroupType(Repetition.REPEATED,
+        "array", convertType("array_element", typeInfo.getChildren().get(0))));
+  }
+
+  // An optional group containing multiple elements
+  private static GroupType convertStructType(final String name, final TypeDescription typeInfo) {
+    return new GroupType(Repetition.OPTIONAL, name, convertTypes(typeInfo.getFieldNames(), typeInfo.getChildren()));
+
+  }
+
+  // An optional group containing a repeated anonymous group "map", containing
+  // 2 elements: "key", "value"
+  private static GroupType convertMapType(final String name, final TypeDescription typeInfo) {
+    final Type keyType = convertType("key",
+        typeInfo.getChildren().get(0), Repetition.REQUIRED);
+    final Type valueType = convertType("value",
+        typeInfo.getChildren().get(1));
+    return ConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/1752e172/java/bench/src/java/org/apache/orc/bench/parquet/HiveStructConverter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/HiveStructConverter.java b/java/bench/src/java/org/apache/orc/bench/parquet/HiveStructConverter.java
new file mode 100644
index 0000000..250b4a2
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/HiveStructConverter.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.parquet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.Type;
+
+/**
+ *
+ * A MapWritableGroupConverter, real converter between hive and parquet types recursively for complex types.
+ *
+ */
+public class HiveStructConverter extends HiveGroupConverter {
+
+  private final int totalFieldCount;
+  private Converter[] converters;
+  private final ConverterParent parent;
+  private final int index;
+  private Writable[] writables;
+  private List<Repeated> repeatedConverters;
+  private boolean reuseWritableArray = false;
+  private List<String> hiveFieldNames;
+  private List<TypeDescription> hiveFieldTypeInfos;
+
+  public HiveStructConverter(final GroupType requestedSchema, final GroupType tableSchema,
+                             Map<String, String> metadata, TypeDescription hiveTypeInfo) {
+    setMetadata(metadata);
+    this.reuseWritableArray = true;
+    this.writables = new Writable[tableSchema.getFieldCount()];
+    this.parent = null;
+    this.index = 0;
+    this.totalFieldCount = tableSchema.getFieldCount();
+    init(requestedSchema, null, 0, tableSchema, hiveTypeInfo);
+  }
+
+  public HiveStructConverter(final GroupType groupType, final ConverterParent parent,
+                             final int index, TypeDescription hiveTypeInfo) {
+    this(groupType, parent, index, groupType, hiveTypeInfo);
+  }
+
+  public HiveStructConverter(final GroupType selectedGroupType,
+                             final ConverterParent parent, final int index, final GroupType containingGroupType, TypeDescription hiveTypeInfo) {
+    this.parent = parent;
+    this.index = index;
+    this.totalFieldCount = containingGroupType.getFieldCount();
+    init(selectedGroupType, parent, index, containingGroupType, hiveTypeInfo);
+  }
+
+  private void init(final GroupType selectedGroupType,
+                    final ConverterParent parent, final int index, final GroupType containingGroupType, TypeDescription hiveTypeInfo) {
+    if (parent != null) {
+      setMetadata(parent.getMetadata());
+    }
+    final int selectedFieldCount = selectedGroupType.getFieldCount();
+
+    converters = new Converter[selectedFieldCount];
+    this.repeatedConverters = new ArrayList<>();
+
+    if (hiveTypeInfo != null && hiveTypeInfo.getCategory() == TypeDescription.Category.STRUCT) {
+      this.hiveFieldNames = hiveTypeInfo.getFieldNames();
+      this.hiveFieldTypeInfos = hiveTypeInfo.getChildren();
+    }
+
+    List<Type> selectedFields = selectedGroupType.getFields();
+    for (int i = 0; i < selectedFieldCount; i++) {
+      Type subtype = selectedFields.get(i);
+      if (containingGroupType.getFields().contains(subtype)) {
+        int fieldIndex = containingGroupType.getFieldIndex(subtype.getName());
+        TypeDescription _hiveTypeInfo = getFieldTypeIgnoreCase(hiveTypeInfo, subtype.getName(), fieldIndex);
+        converters[i] = getFieldConverter(subtype, fieldIndex, _hiveTypeInfo);
+      } else {
+        throw new IllegalStateException("Group type [" + containingGroupType +
+            "] does not contain requested field: " + subtype);
+      }
+    }
+  }
+
+  private TypeDescription getFieldTypeIgnoreCase(TypeDescription hiveTypeInfo, String fieldName, int fieldIndex) {
+    if (hiveTypeInfo == null) {
+      return null;
+    } else if (hiveTypeInfo.getCategory() == TypeDescription.Category.STRUCT) {
+      return getStructFieldTypeInfo(fieldName, fieldIndex);
+    } else if (hiveTypeInfo.getCategory() == TypeDescription.Category.MAP) {
+      //This cover the case where hive table may have map<key, value> but the data file is
+      // of type array<struct<value1, value2>>
+      //Using index in place of type name.
+      if (fieldIndex == 0) {
+        return hiveTypeInfo.getChildren().get(0);
+      } else if (fieldIndex == 1) {
+        return hiveTypeInfo.getChildren().get(1);
+      } else {//Other fields are skipped for this case
+        return null;
+      }
+    }
+    throw new RuntimeException("Unknown hive type info " + hiveTypeInfo + " when searching for field " + fieldName);
+  }
+
+  private TypeDescription getStructFieldTypeInfo(String field, int fieldIndex) {
+    String fieldLowerCase = field.toLowerCase();
+    if (Boolean.parseBoolean(getMetadata().get(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS))
+        && fieldIndex < hiveFieldNames.size()) {
+      return hiveFieldTypeInfos.get(fieldIndex);
+    }
+    for (int i = 0; i < hiveFieldNames.size(); i++) {
+      if (fieldLowerCase.equalsIgnoreCase(hiveFieldNames.get(i))) {
+        return hiveFieldTypeInfos.get(i);
+      }
+    }
+    //This means hive type doesn't refer this field that comes from file schema.
+    //i.e. the field is not required for hive table. It can occur due to schema
+    //evolution where some field is deleted.
+    return null;
+  }
+
+  private Converter getFieldConverter(Type type, int fieldIndex, TypeDescription hiveTypeInfo) {
+    Converter converter;
+    if (type.isRepetition(Type.Repetition.REPEATED)) {
+      if (type.isPrimitive()) {
+        converter = new Repeated.RepeatedPrimitiveConverter(
+            type.asPrimitiveType(), this, fieldIndex, hiveTypeInfo);
+      } else {
+        converter = new Repeated.RepeatedGroupConverter(
+            type.asGroupType(), this, fieldIndex, hiveTypeInfo == null ? null : hiveTypeInfo.getChildren().get(0));
+      }
+
+      repeatedConverters.add((Repeated) converter);
+    } else {
+      converter = getConverterFromDescription(type, fieldIndex, this, hiveTypeInfo);
+    }
+
+    return converter;
+  }
+
+  public final ArrayWritable getCurrentArray() {
+    return new ArrayWritable(Writable.class, writables);
+  }
+
+  @Override
+  public void set(int fieldIndex, Writable value) {
+    writables[fieldIndex] = value;
+  }
+
+  @Override
+  public Converter getConverter(final int fieldIndex) {
+    return converters[fieldIndex];
+  }
+
+  @Override
+  public void start() {
+    if (reuseWritableArray) {
+      // reset the array to null values
+      for (int i = 0; i < writables.length; i += 1) {
+        writables[i] = null;
+      }
+    } else {
+      this.writables = new Writable[totalFieldCount];
+    }
+    for (Repeated repeated : repeatedConverters) {
+      repeated.parentStart();
+    }
+  }
+
+  @Override
+  public void end() {
+    for (Repeated repeated : repeatedConverters) {
+      repeated.parentEnd();
+    }
+    if (parent != null) {
+      parent.set(index, getCurrentArray());
+    }
+  }
+
+}