You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:11 UTC

[09/15] orc git commit: more updates

more updates


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/5b37113b
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/5b37113b
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/5b37113b

Branch: refs/heads/orc-72
Commit: 5b37113b73eb0e12744f2711326e11cd2ef6eaef
Parents: 86628bc
Author: Owen O'Malley <om...@apache.org>
Authored: Mon Oct 3 10:01:40 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Oct 10 13:59:16 2016 -0700

----------------------------------------------------------------------
 java/bench/pom.xml                              |   4 +
 .../src/java/org/apache/orc/bench/AvroScan.java |  47 ---
 .../org/apache/orc/bench/AvroSchemaUtils.java   | 190 ----------
 .../java/org/apache/orc/bench/AvroWriter.java   | 375 -------------------
 .../java/org/apache/orc/bench/CsvReader.java    | 175 ---------
 .../src/java/org/apache/orc/bench/CsvScan.java  |  40 --
 .../java/org/apache/orc/bench/GithubToAvro.java |   2 +
 .../java/org/apache/orc/bench/GithubToJson.java |   2 +-
 .../java/org/apache/orc/bench/GithubToOrc.java  |   4 +-
 .../org/apache/orc/bench/GithubToParquet.java   |   2 +
 .../java/org/apache/orc/bench/JsonReader.java   | 278 --------------
 .../src/java/org/apache/orc/bench/JsonScan.java |  61 ---
 .../src/java/org/apache/orc/bench/OrcScan.java  |  46 ---
 .../java/org/apache/orc/bench/ParquetScan.java  |  54 ---
 .../java/org/apache/orc/bench/SalesToAvro.java  |   1 +
 .../org/apache/orc/bench/SalesToParquet.java    |   1 +
 .../java/org/apache/orc/bench/TaxiToAvro.java   |   2 +
 .../java/org/apache/orc/bench/TaxiToJson.java   |   1 +
 .../java/org/apache/orc/bench/TaxiToOrc.java    |   1 +
 .../org/apache/orc/bench/TaxiToParquet.java     |   2 +
 .../org/apache/orc/bench/avro/AvroScan.java     |  47 +++
 .../apache/orc/bench/avro/AvroSchemaUtils.java  | 190 ++++++++++
 .../org/apache/orc/bench/avro/AvroWriter.java   | 375 +++++++++++++++++++
 .../org/apache/orc/bench/csv/CsvReader.java     | 175 +++++++++
 .../java/org/apache/orc/bench/csv/CsvScan.java  |  41 ++
 .../org/apache/orc/bench/json/JsonReader.java   | 278 ++++++++++++++
 .../org/apache/orc/bench/json/JsonScan.java     |  61 +++
 .../java/org/apache/orc/bench/orc/OrcScan.java  |  46 +++
 .../apache/orc/bench/parquet/ParquetScan.java   |  54 +++
 java/pom.xml                                    |  15 +-
 30 files changed, 1295 insertions(+), 1275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/pom.xml
----------------------------------------------------------------------
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index f0bf55a..f40f21b 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -67,6 +67,10 @@
       <artifactId>hive-storage-api</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.openjdk.jmh</groupId>
       <artifactId>jmh-core</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
deleted file mode 100644
index 61f6a62..0000000
--- a/java/bench/src/java/org/apache/orc/bench/AvroScan.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-public class AvroScan {
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    long rowCount = 0;
-    for(String filename: args) {
-      FsInput file = new FsInput(new Path(filename), conf);
-      DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-      DataFileReader<GenericRecord> dataFileReader =
-          new DataFileReader<>(file, datumReader);
-      GenericRecord record = null;
-      while (dataFileReader.hasNext()) {
-        record = dataFileReader.next(record);
-        rowCount += 1;
-      }
-    }
-    System.out.println("Rows read: " + rowCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
deleted file mode 100644
index 02931c3..0000000
--- a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.bench;
-
-import org.apache.avro.Schema;
-import org.apache.orc.TypeDescription;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Convert Hive TypeInfo to an Avro Schema
- */
-public class AvroSchemaUtils {
-
-  private AvroSchemaUtils() {
-    // No instances
-  }
-
-  public static Schema createAvroSchema(TypeDescription typeInfo) {
-    Schema schema;
-    switch (typeInfo.getCategory()) {
-      case STRING:
-        schema = Schema.create(Schema.Type.STRING);
-        break;
-      case CHAR:
-        schema = getSchemaFor("{" +
-            "\"type\":\"string\"," +
-            "\"logicalType\":\"char\"," +
-            "\"maxLength\":" + typeInfo.getMaxLength() + "}");
-        break;
-      case VARCHAR:
-        schema = getSchemaFor("{" +
-            "\"type\":\"string\"," +
-            "\"logicalType\":\"varchar\"," +
-            "\"maxLength\":" + typeInfo.getMaxLength() + "}");
-        break;
-      case BINARY:
-        schema = Schema.create(Schema.Type.BYTES);
-        break;
-      case BYTE:
-        schema = Schema.create(Schema.Type.INT);
-        break;
-      case SHORT:
-        schema = Schema.create(Schema.Type.INT);
-        break;
-      case INT:
-        schema = Schema.create(Schema.Type.INT);
-        break;
-      case LONG:
-        schema = Schema.create(Schema.Type.LONG);
-        break;
-      case FLOAT:
-        schema = Schema.create(Schema.Type.FLOAT);
-        break;
-      case DOUBLE:
-        schema = Schema.create(Schema.Type.DOUBLE);
-        break;
-      case BOOLEAN:
-        schema = Schema.create(Schema.Type.BOOLEAN);
-        break;
-      case DECIMAL:
-        String precision = String.valueOf(typeInfo.getPrecision());
-        String scale = String.valueOf(typeInfo.getScale());
-        schema = getSchemaFor("{" +
-            "\"type\":\"bytes\"," +
-            "\"logicalType\":\"decimal\"," +
-            "\"precision\":" + precision + "," +
-            "\"scale\":" + scale + "}");
-        break;
-      case DATE:
-        schema = getSchemaFor("{" +
-            "\"type\":\"int\"," +
-            "\"logicalType\":\"date\"}");
-        break;
-      case TIMESTAMP:
-        schema = getSchemaFor("{" +
-            "\"type\":\"long\"," +
-            "\"logicalType\":\"timestamp-millis\"}");
-        break;
-      case LIST:
-        schema = createAvroArray(typeInfo);
-        break;
-      case MAP:
-        schema = createAvroMap(typeInfo);
-        break;
-      case STRUCT:
-        schema = createAvroRecord(typeInfo);
-        break;
-      case UNION:
-        schema = createAvroUnion(typeInfo);
-        break;
-      default:
-        throw new UnsupportedOperationException(typeInfo + " is not supported.");
-    }
-
-    return wrapInUnionWithNull(schema);
-  }
-
-  private static Schema createAvroUnion(TypeDescription typeInfo) {
-    List<Schema> childSchemas = new ArrayList<>();
-    for (TypeDescription childTypeInfo : typeInfo.getChildren()) {
-      Schema childSchema = createAvroSchema(childTypeInfo);
-      if (childSchema.getType() == Schema.Type.UNION) {
-        for (Schema grandkid: childSchema.getTypes()) {
-          if (childSchema.getType() != Schema.Type.NULL) {
-            childSchemas.add(grandkid);
-          }
-        }
-      } else {
-        childSchemas.add(childSchema);
-      }
-    }
-
-    return Schema.createUnion(childSchemas);
-  }
-
-  private static Schema createAvroRecord(TypeDescription typeInfo) {
-    List<Schema.Field> childFields = new ArrayList<>();
-
-    List<String> fieldNames = typeInfo.getFieldNames();
-    List<TypeDescription> fieldTypes = typeInfo.getChildren();
-
-    for (int i = 0; i < fieldNames.size(); ++i) {
-      TypeDescription childTypeInfo = fieldTypes.get(i);
-      Schema.Field field = new Schema.Field(fieldNames.get(i),
-          createAvroSchema(childTypeInfo), childTypeInfo.toString(),
-          (Object) null);
-      childFields.add(field);
-    }
-
-    Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(),
-        typeInfo.toString(), null, false);
-    recordSchema.setFields(childFields);
-    return recordSchema;
-  }
-
-  private static Schema createAvroMap(TypeDescription typeInfo) {
-    TypeDescription keyTypeInfo = typeInfo.getChildren().get(0);
-    if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) {
-      throw new UnsupportedOperationException("Avro only supports maps with string keys "
-          + typeInfo);
-    }
-
-    Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1));
-
-    return Schema.createMap(valueSchema);
-  }
-
-  private static Schema createAvroArray(TypeDescription typeInfo) {
-    Schema child = createAvroSchema(typeInfo.getChildren().get(0));
-    return Schema.createArray(child);
-  }
-
-  private static Schema wrapInUnionWithNull(Schema schema) {
-    Schema NULL = Schema.create(Schema.Type.NULL);
-    switch (schema.getType()) {
-      case NULL:
-        return schema;
-      case UNION:
-        List<Schema> kids = schema.getTypes();
-        List<Schema> newKids = new ArrayList<>(kids.size() + 1);
-        newKids.add(NULL);
-        return Schema.createUnion(newKids);
-      default:
-        return Schema.createUnion(Arrays.asList(NULL, schema));
-    }
-  }
-
-  private static Schema getSchemaFor(String str) {
-    Schema.Parser parser = new Schema.Parser();
-    return parser.parse(str);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
deleted file mode 100644
index 094d115..0000000
--- a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-public class AvroWriter {
-
-  static Properties setHiveSchema(TypeDescription schema) {
-    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
-      throw new IllegalArgumentException("Assumes struct type as root, not " +
-          schema);
-    }
-    StringBuilder fieldNames = new StringBuilder();
-    StringBuilder fieldTypes = new StringBuilder();
-    List<String> childNames = schema.getFieldNames();
-    List<TypeDescription> childTypes = schema.getChildren();
-    for(int f=0; f < childNames.size(); ++f) {
-      if (f != 0) {
-        fieldNames.append(',');
-        fieldTypes.append(',');
-      }
-      fieldNames.append(childNames.get(f));
-      fieldTypes.append(childTypes.get(f).toString());
-    }
-    Properties properties = new Properties();
-    properties.put("columns", fieldNames.toString());
-    properties.put("columns.types", fieldTypes.toString());
-    return properties;
-  }
-
-  interface AvroConverter {
-    Object convert(ColumnVector vector, int row);
-  }
-
-  private static class BooleanConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        LongColumnVector vector = (LongColumnVector) cv;
-        return vector.vector[row] != 0;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class IntConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        LongColumnVector vector = (LongColumnVector) cv;
-        return (int) vector.vector[row];
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class LongConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        LongColumnVector vector = (LongColumnVector) cv;
-        return vector.vector[row];
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class FloatConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        DoubleColumnVector vector = (DoubleColumnVector) cv;
-        return (float) vector.vector[row];
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class DoubleConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        DoubleColumnVector vector = (DoubleColumnVector) cv;
-        return vector.vector[row];
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class StringConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        BytesColumnVector vector = (BytesColumnVector) cv;
-        return new String(vector.vector[row], vector.start[row],
-            vector.length[row]);
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class BinaryConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        BytesColumnVector vector = (BytesColumnVector) cv;
-        return ByteBuffer.wrap(vector.vector[row], vector.start[row],
-            vector.length[row]);
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class TimestampConverter implements AvroConverter {
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        TimestampColumnVector vector = (TimestampColumnVector) cv;
-        return vector.time[row];
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class DecimalConverter implements AvroConverter {
-    final int scale;
-    DecimalConverter(int scale) {
-      this.scale = scale;
-    }
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        DecimalColumnVector vector = (DecimalColumnVector) cv;
-        return getBufferFromDecimal(
-            vector.vector[row].getHiveDecimal(), scale);
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class ListConverter implements AvroConverter {
-    final Schema avroSchema;
-    final AvroConverter childConverter;
-
-    ListConverter(TypeDescription schema, Schema avroSchema) {
-      this.avroSchema = avroSchema;
-      childConverter = createConverter(schema.getChildren().get(0),
-          removeNullable(avroSchema.getElementType()));
-    }
-
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        ListColumnVector vector = (ListColumnVector) cv;
-        int offset = (int) vector.offsets[row];
-        int length = (int) vector.lengths[row];
-        GenericData.Array result = new GenericData.Array(length, avroSchema);
-        for(int i=0; i < length; ++i) {
-          result.add(childConverter.convert(vector.child, offset + i));
-        }
-        return result;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  private static class StructConverter implements AvroConverter {
-    final Schema avroSchema;
-    final AvroConverter[] childConverters;
-
-    StructConverter(TypeDescription schema, Schema avroSchema) {
-      this.avroSchema = avroSchema;
-      List<TypeDescription> childrenTypes = schema.getChildren();
-      childConverters = new AvroConverter[childrenTypes.size()];
-      List<Schema.Field> fields = avroSchema.getFields();
-      for(int f=0; f < childConverters.length; ++f) {
-        childConverters[f] = createConverter(childrenTypes.get(f),
-            removeNullable(fields.get(f).schema()));
-      }
-    }
-
-    public Object convert(ColumnVector cv, int row) {
-      if (cv.isRepeating) {
-        row = 0;
-      }
-      if (cv.noNulls || !cv.isNull[row]) {
-        StructColumnVector vector = (StructColumnVector) cv;
-        GenericData.Record result = new GenericData.Record(avroSchema);
-        for(int f=0; f < childConverters.length; ++f) {
-          result.put(f, childConverters[f].convert(vector.fields[f], row));
-        }
-        return result;
-      } else {
-        return null;
-      }
-    }
-  }
-
-  static AvroConverter createConverter(TypeDescription types,
-                                       Schema avroSchema) {
-    switch (types.getCategory()) {
-      case BINARY:
-        return new BinaryConverter();
-      case BOOLEAN:
-        return new BooleanConverter();
-      case BYTE:
-      case SHORT:
-      case INT:
-        return new IntConverter();
-      case LONG:
-        return new LongConverter();
-      case FLOAT:
-        return new FloatConverter();
-      case DOUBLE:
-        return new DoubleConverter();
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        return new StringConverter();
-      case TIMESTAMP:
-        return new TimestampConverter();
-      case DECIMAL:
-        return new DecimalConverter(types.getScale());
-      case LIST:
-        return new ListConverter(types, avroSchema);
-      case STRUCT:
-        return new StructConverter(types, avroSchema);
-      default:
-        throw new IllegalArgumentException("Unhandled type " + types);
-    }
-  }
-
-  /**
-   * Remove the union(null, ...) wrapper around the schema.
-   *
-   * All of the types in Hive are nullable and in Avro those are represented
-   * by wrapping each type in a union type with the void type.
-   * @param avro The avro type
-   * @return The avro type with the nullable layer removed
-   */
-  static Schema removeNullable(Schema avro) {
-    while (avro.getType() == Schema.Type.UNION) {
-      List<Schema> children = avro.getTypes();
-      if (children.size() == 2 &&
-          children.get(0).getType() == Schema.Type.NULL) {
-        avro = children.get(1);
-      } else {
-        break;
-      }
-    }
-    return avro;
-  }
-
-  private final AvroConverter[] converters;
-  private final DataFileWriter writer;
-  private final GenericRecord record;
-
-  public AvroWriter(Path path, TypeDescription schema,
-                    Configuration conf,
-                    String compression) throws IOException {
-    List<TypeDescription> childTypes = schema.getChildren();
-    Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
-    List<Schema.Field> avroFields = avroSchema.getFields();
-    converters = new AvroConverter[childTypes.size()];
-    for(int c=0; c < converters.length; ++c) {
-      converters[c] = createConverter(childTypes.get(c),
-          removeNullable(avroFields.get(c).schema()));
-    }
-    GenericDatumWriter gdw = new GenericDatumWriter(avroSchema);
-    writer = new DataFileWriter(gdw);
-    if (compression != null & !"".equals(compression)) {
-      writer.setCodec(CodecFactory.fromString(compression));
-    }
-    writer.create(avroSchema, path.getFileSystem(conf).create(path));
-    record = new GenericData.Record(avroSchema);
-  }
-
-  public void writeBatch(VectorizedRowBatch batch) throws IOException {
-    for(int r=0; r < batch.size; ++r) {
-      for(int f=0; f < batch.cols.length; ++f) {
-        record.put(f, converters[f].convert(batch.cols[f], r));
-      }
-      writer.append(record);
-    }
-  }
-
-  public void close() throws IOException {
-    writer.close();
-  }
-
-  static Buffer getBufferFromBytes(byte[] input) {
-    ByteBuffer bb = ByteBuffer.wrap(input);
-    return bb.rewind();
-  }
-
-  public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
-    if (dec == null) {
-      return null;
-    }
-
-    dec = dec.setScale(scale);
-    return getBufferFromBytes(dec.unscaledValue().toByteArray());
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/CsvReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/CsvReader.java
deleted file mode 100644
index 5c86a89..0000000
--- a/java/bench/src/java/org/apache/orc/bench/CsvReader.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.bench;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.zip.GZIPInputStream;
-
-public class CsvReader {
-  private final Iterator<CSVRecord> parser;
-  private final ColumnReader[] readers;
-
-  interface ColumnReader {
-    void read(String value, ColumnVector vect, int row);
-  }
-
-  static class LongColumnReader implements ColumnReader {
-    public void read(String value, ColumnVector vect, int row) {
-      if ("".equals(value)) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        LongColumnVector vector = (LongColumnVector) vect;
-        vector.vector[row] = Long.parseLong(value);
-      }
-    }
-  }
-
-  static class DoubleColumnReader implements ColumnReader {
-    public void read(String value, ColumnVector vect, int row) {
-      if ("".equals(value)) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        DoubleColumnVector vector = (DoubleColumnVector) vect;
-        vector.vector[row] = Double.parseDouble(value);
-      }
-    }
-  }
-
-  static class StringColumnReader implements ColumnReader {
-    public void read(String value, ColumnVector vect, int row) {
-      if ("".equals(value)) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        BytesColumnVector vector = (BytesColumnVector) vect;
-        byte[] bytes = value.getBytes();
-        vector.setRef(row, bytes, 0, bytes.length);
-      }
-    }
-  }
-
-  static class TimestampColumnReader implements ColumnReader {
-    public void read(String value, ColumnVector vect, int row) {
-      if ("".equals(value)) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        TimestampColumnVector vector = (TimestampColumnVector) vect;
-        vector.set(row, Timestamp.valueOf(value));
-      }
-    }
-  }
-
-  static class DecimalColumnReader implements ColumnReader {
-    public void read(String value, ColumnVector vect, int row) {
-      if ("".equals(value)) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        DecimalColumnVector vector = (DecimalColumnVector) vect;
-        vector.vector[row].set(HiveDecimal.create(value));
-      }
-    }
-  }
-
-  ColumnReader createReader(TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        return new LongColumnReader();
-      case FLOAT:
-      case DOUBLE:
-        return new DoubleColumnReader();
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        return new StringColumnReader();
-      case DECIMAL:
-        return new DecimalColumnReader();
-      case TIMESTAMP:
-        return new TimestampColumnReader();
-      default:
-        throw new IllegalArgumentException("Unhandled type " + schema);
-    }
-  }
-
-  public CsvReader(Path path,
-                   Configuration conf,
-                   TypeDescription schema) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    FSDataInputStream raw = fs.open(path);
-    String name = path.getName();
-    int lastDot = name.lastIndexOf(".");
-    InputStream input = raw;
-    if (lastDot >= 0) {
-      if (".gz".equals(name.substring(lastDot))) {
-        input = new DataInputStream(new GZIPInputStream(raw));
-      }
-    }
-    parser = new CSVParser(new InputStreamReader(input),
-        CSVFormat.RFC4180.withHeader()).iterator();
-    List<TypeDescription> columnTypes = schema.getChildren();
-    readers = new ColumnReader[columnTypes.size()];
-    int c = 0;
-    for(TypeDescription columnType: columnTypes) {
-      readers[c++] = createReader(columnType);
-    }
-  }
-
-  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
-    batch.reset();
-    int maxSize = batch.getMaxSize();
-    while (parser.hasNext() && batch.size < maxSize) {
-      CSVRecord record = parser.next();
-      int c = 0;
-      for(String val: record) {
-        readers[c].read(val, batch.cols[c], batch.size);
-        c += 1;
-      }
-      batch.size++;
-    }
-    return batch.size != 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/CsvScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/CsvScan.java
deleted file mode 100644
index f2ec61a..0000000
--- a/java/bench/src/java/org/apache/orc/bench/CsvScan.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-public class CsvScan {
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    long rowCount = 0;
-    TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
-    for(String filename: args) {
-      CsvReader reader = new CsvReader(new Path(filename), conf, schema);
-      VectorizedRowBatch batch = schema.createRowBatch();
-      while (reader.nextBatch(batch)) {
-        rowCount += batch.size;
-      }
-    }
-    System.out.println("Rows read: " + rowCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
index 982db64..ee882e7 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.json.JsonReader;
 
 public class GithubToAvro {
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
index f5ae6b1..1dd23de 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
@@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.json.JsonReader;
 import org.apache.orc.tools.FileDump;
 
 import java.io.OutputStreamWriter;
 import java.io.Writer;
-import java.util.zip.GZIPOutputStream;
 
 public class GithubToJson {
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
index cbc1997..ebd6443 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
@@ -24,9 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
-
-import java.io.IOException;
-import java.io.InputStream;
+import org.apache.orc.bench.json.JsonReader;
 
 public class GithubToOrc {
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
index e1fafdc..db88c52 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.json.JsonReader;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/JsonReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/JsonReader.java
deleted file mode 100644
index 599c872..0000000
--- a/java/bench/src/java/org/apache/orc/bench/JsonReader.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.bench;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonStreamParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.zip.GZIPInputStream;
-
-public class JsonReader {
-  private final TypeDescription schema;
-  private final JsonStreamParser parser;
-  private final JsonConverter[] converters;
-
-  interface JsonConverter {
-    void convert(JsonElement value, ColumnVector vect, int row);
-  }
-
-  static class BooleanColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        LongColumnVector vector = (LongColumnVector) vect;
-        vector.vector[row] = value.getAsBoolean() ? 1 : 0;
-      }
-    }
-  }
-
-  static class LongColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        LongColumnVector vector = (LongColumnVector) vect;
-        vector.vector[row] = value.getAsLong();
-      }
-    }
-  }
-
-  static class DoubleColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        DoubleColumnVector vector = (DoubleColumnVector) vect;
-        vector.vector[row] = value.getAsDouble();
-      }
-    }
-  }
-
-  static class StringColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        BytesColumnVector vector = (BytesColumnVector) vect;
-        byte[] bytes = value.getAsString().getBytes();
-        vector.setRef(row, bytes, 0, bytes.length);
-      }
-    }
-  }
-
-  static class BinaryColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        BytesColumnVector vector = (BytesColumnVector) vect;
-        String binStr = value.getAsString();
-        byte[] bytes = new byte[binStr.length()/2];
-        for(int i=0; i < bytes.length; ++i) {
-          bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16);
-        }
-        vector.setRef(row, bytes, 0, bytes.length);
-      }
-    }
-  }
-
-  static class TimestampColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        TimestampColumnVector vector = (TimestampColumnVector) vect;
-        vector.set(row, Timestamp.valueOf(value.getAsString()
-            .replaceAll("[TZ]", " ")));
-      }
-    }
-  }
-
-  static class DecimalColumnConverter implements JsonConverter {
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        DecimalColumnVector vector = (DecimalColumnVector) vect;
-        vector.vector[row].set(HiveDecimal.create(value.getAsString()));
-      }
-    }
-  }
-
-  static class StructColumnConverter implements JsonConverter {
-    private JsonConverter[] childrenConverters;
-    private List<String> fieldNames;
-
-    public StructColumnConverter(TypeDescription schema) {
-      List<TypeDescription> kids = schema.getChildren();
-      childrenConverters = new JsonConverter[kids.size()];
-      for(int c=0; c < childrenConverters.length; ++c) {
-        childrenConverters[c] = createConverter(kids.get(c));
-      }
-      fieldNames = schema.getFieldNames();
-    }
-
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        StructColumnVector vector = (StructColumnVector) vect;
-        JsonObject obj = value.getAsJsonObject();
-        for(int c=0; c < childrenConverters.length; ++c) {
-          JsonElement elem = obj.get(fieldNames.get(c));
-          childrenConverters[c].convert(elem, vector.fields[c], row);
-        }
-      }
-    }
-  }
-
-  static class ListColumnConverter implements JsonConverter {
-    private JsonConverter childrenConverter;
-
-    public ListColumnConverter(TypeDescription schema) {
-      childrenConverter = createConverter(schema.getChildren().get(0));
-    }
-
-    public void convert(JsonElement value, ColumnVector vect, int row) {
-      if (value == null || value.isJsonNull()) {
-        vect.noNulls = false;
-        vect.isNull[row] = true;
-      } else {
-        ListColumnVector vector = (ListColumnVector) vect;
-        JsonArray obj = value.getAsJsonArray();
-        vector.lengths[row] = obj.size();
-        vector.offsets[row] = vector.childCount;
-        vector.childCount += vector.lengths[row];
-        vector.child.ensureSize(vector.childCount, true);
-        for(int c=0; c < obj.size(); ++c) {
-          childrenConverter.convert(obj.get(c), vector.child,
-              (int) vector.offsets[row] + c);
-        }
-      }
-    }
-  }
-
-  static JsonConverter createConverter(TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        return new LongColumnConverter();
-      case FLOAT:
-      case DOUBLE:
-        return new DoubleColumnConverter();
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        return new StringColumnConverter();
-      case DECIMAL:
-        return new DecimalColumnConverter();
-      case TIMESTAMP:
-        return new TimestampColumnConverter();
-      case BINARY:
-        return new BinaryColumnConverter();
-      case BOOLEAN:
-        return new BooleanColumnConverter();
-      case STRUCT:
-        return new StructColumnConverter(schema);
-      case LIST:
-        return new ListColumnConverter(schema);
-      default:
-        throw new IllegalArgumentException("Unhandled type " + schema);
-    }
-  }
-
-  public JsonReader(Path path,
-                    Configuration conf,
-                    TypeDescription schema) throws IOException {
-    this.schema = schema;
-    FileSystem fs = path.getFileSystem(conf);
-    FSDataInputStream raw = fs.open(path);
-    String name = path.getName();
-    int lastDot = name.lastIndexOf(".");
-    InputStream input = raw;
-    if (lastDot >= 0) {
-      if (".gz".equals(name.substring(lastDot))) {
-        input = new GZIPInputStream(raw);
-      }
-    }
-    parser = new JsonStreamParser(new InputStreamReader(input));
-    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
-      throw new IllegalArgumentException("Root must be struct - " + schema);
-    }
-    List<TypeDescription> fieldTypes = schema.getChildren();
-    converters = new JsonConverter[fieldTypes.size()];
-    for(int c = 0; c < converters.length; ++c) {
-      converters[c] = createConverter(fieldTypes.get(c));
-    }
-  }
-
-  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
-    batch.reset();
-    int maxSize = batch.getMaxSize();
-    List<String> fieldNames = schema.getFieldNames();
-    while (parser.hasNext() && batch.size < maxSize) {
-      JsonObject elem = parser.next().getAsJsonObject();
-      for(int c=0; c < converters.length; ++c) {
-        // look up each field to see if it is in the input, otherwise
-        // set it to null.
-        JsonElement field = elem.get(fieldNames.get(c));
-        if (field == null) {
-          batch.cols[c].noNulls = false;
-          batch.cols[c].isNull[batch.size] = true;
-        } else {
-          converters[c].convert(field, batch.cols[c], batch.size);
-        }
-      }
-      batch.size++;
-    }
-    return batch.size != 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/JsonScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/JsonScan.java b/java/bench/src/java/org/apache/orc/bench/JsonScan.java
deleted file mode 100644
index 1115ae6..0000000
--- a/java/bench/src/java/org/apache/orc/bench/JsonScan.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import com.google.gson.JsonStreamParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.zip.GZIPInputStream;
-
-public class JsonScan {
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
-    long rowCount = 0;
-    for(String filename: args) {
-      Path path = new Path(filename);
-      FileSystem fs = path.getFileSystem(conf);
-      FSDataInputStream raw = fs.open(path);
-      int lastDot = filename.lastIndexOf(".");
-      InputStream input = raw;
-      if (lastDot >= 0) {
-        if (".gz".equals(filename.substring(lastDot))) {
-          input = new GZIPInputStream(raw);
-        }
-      }
-      JsonStreamParser parser =
-          new JsonStreamParser(new InputStreamReader(input));
-      while (parser.hasNext()) {
-        parser.next();
-        rowCount += 1;
-      }
-    }
-    System.out.println("Rows read: " + rowCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/OrcScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/OrcScan.java b/java/bench/src/java/org/apache/orc/bench/OrcScan.java
deleted file mode 100644
index 096f3fa..0000000
--- a/java/bench/src/java/org/apache/orc/bench/OrcScan.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-public class OrcScan {
-  public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
-    long rowCount = 0;
-    for(String filename: args) {
-      Reader reader = OrcFile.createReader(new Path(filename), options);
-      TypeDescription schema = reader.getSchema();
-      RecordReader rows = reader.rows();
-      VectorizedRowBatch batch = schema.createRowBatch();
-      while (rows.nextBatch(batch)) {
-        rowCount += batch.size;
-      }
-      rows.close();
-    }
-    System.out.println("Rows read: " + rowCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java
deleted file mode 100644
index ccaaa2a..0000000
--- a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-
-public class ParquetScan {
-  public static void main(String[] args) throws Exception {
-    JobConf conf = new JobConf();
-    long rowCount = 0;
-    ParquetInputFormat<ArrayWritable> inputFormat =
-        new ParquetInputFormat<>(DataWritableReadSupport.class);
-
-    NullWritable nada = NullWritable.get();
-    for(String filename: args) {
-      FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE,
-          new String[]{});
-      RecordReader<NullWritable,ArrayWritable> recordReader =
-          new ParquetRecordReaderWrapper(inputFormat, split, conf,
-              Reporter.NULL);
-      ArrayWritable value = recordReader.createValue();
-      while (recordReader.next(nada, value)) {
-        rowCount += 1;
-      }
-      recordReader.close();
-    }
-    System.out.println("Rows read: " + rowCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
index d4fd4a2..900be66 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
 
 public class SalesToAvro {
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
index 985da90..3da900f 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
 
 import java.util.Properties;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
index b490a8a..2b14f50 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.csv.CsvReader;
 
 public class TaxiToAvro {
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
index 98fbe17..4b8ca8c 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.csv.CsvReader;
 import org.apache.orc.tools.FileDump;
 import org.iq80.snappy.SnappyOutputStream;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
index dee5da6..2588c72 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
@@ -25,6 +25,7 @@ import org.apache.orc.OrcFile;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.bench.csv.CsvReader;
 
 import java.io.IOException;
 import java.io.InputStream;

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
index 3edce17..3eafc87 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.csv.CsvReader;
 
 import java.util.Properties;
 public class TaxiToParquet {

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java
new file mode 100644
index 0000000..1292c2b
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class AvroScan {
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    long rowCount = 0;
+    for(String filename: args) {
+      FsInput file = new FsInput(new Path(filename), conf);
+      DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+      DataFileReader<GenericRecord> dataFileReader =
+          new DataFileReader<>(file, datumReader);
+      GenericRecord record = null;
+      while (dataFileReader.hasNext()) {
+        record = dataFileReader.next(record);
+        rowCount += 1;
+      }
+    }
+    System.out.println("Rows read: " + rowCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java
new file mode 100644
index 0000000..5df7b70
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.avro;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Convert Hive TypeInfo to an Avro Schema
+ */
+public class AvroSchemaUtils {
+
+  private AvroSchemaUtils() {
+    // No instances
+  }
+
+  public static Schema createAvroSchema(TypeDescription typeInfo) {
+    Schema schema;
+    switch (typeInfo.getCategory()) {
+      case STRING:
+        schema = Schema.create(Schema.Type.STRING);
+        break;
+      case CHAR:
+        schema = getSchemaFor("{" +
+            "\"type\":\"string\"," +
+            "\"logicalType\":\"char\"," +
+            "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+        break;
+      case VARCHAR:
+        schema = getSchemaFor("{" +
+            "\"type\":\"string\"," +
+            "\"logicalType\":\"varchar\"," +
+            "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+        break;
+      case BINARY:
+        schema = Schema.create(Schema.Type.BYTES);
+        break;
+      case BYTE:
+        schema = Schema.create(Schema.Type.INT);
+        break;
+      case SHORT:
+        schema = Schema.create(Schema.Type.INT);
+        break;
+      case INT:
+        schema = Schema.create(Schema.Type.INT);
+        break;
+      case LONG:
+        schema = Schema.create(Schema.Type.LONG);
+        break;
+      case FLOAT:
+        schema = Schema.create(Schema.Type.FLOAT);
+        break;
+      case DOUBLE:
+        schema = Schema.create(Schema.Type.DOUBLE);
+        break;
+      case BOOLEAN:
+        schema = Schema.create(Schema.Type.BOOLEAN);
+        break;
+      case DECIMAL:
+        String precision = String.valueOf(typeInfo.getPrecision());
+        String scale = String.valueOf(typeInfo.getScale());
+        schema = getSchemaFor("{" +
+            "\"type\":\"bytes\"," +
+            "\"logicalType\":\"decimal\"," +
+            "\"precision\":" + precision + "," +
+            "\"scale\":" + scale + "}");
+        break;
+      case DATE:
+        schema = getSchemaFor("{" +
+            "\"type\":\"int\"," +
+            "\"logicalType\":\"date\"}");
+        break;
+      case TIMESTAMP:
+        schema = getSchemaFor("{" +
+            "\"type\":\"long\"," +
+            "\"logicalType\":\"timestamp-millis\"}");
+        break;
+      case LIST:
+        schema = createAvroArray(typeInfo);
+        break;
+      case MAP:
+        schema = createAvroMap(typeInfo);
+        break;
+      case STRUCT:
+        schema = createAvroRecord(typeInfo);
+        break;
+      case UNION:
+        schema = createAvroUnion(typeInfo);
+        break;
+      default:
+        throw new UnsupportedOperationException(typeInfo + " is not supported.");
+    }
+
+    return wrapInUnionWithNull(schema);
+  }
+
+  private static Schema createAvroUnion(TypeDescription typeInfo) {
+    List<Schema> childSchemas = new ArrayList<>();
+    for (TypeDescription childTypeInfo : typeInfo.getChildren()) {
+      Schema childSchema = createAvroSchema(childTypeInfo);
+      if (childSchema.getType() == Schema.Type.UNION) {
+        for (Schema grandkid: childSchema.getTypes()) {
+          if (childSchema.getType() != Schema.Type.NULL) {
+            childSchemas.add(grandkid);
+          }
+        }
+      } else {
+        childSchemas.add(childSchema);
+      }
+    }
+
+    return Schema.createUnion(childSchemas);
+  }
+
+  private static Schema createAvroRecord(TypeDescription typeInfo) {
+    List<Schema.Field> childFields = new ArrayList<>();
+
+    List<String> fieldNames = typeInfo.getFieldNames();
+    List<TypeDescription> fieldTypes = typeInfo.getChildren();
+
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      TypeDescription childTypeInfo = fieldTypes.get(i);
+      Schema.Field field = new Schema.Field(fieldNames.get(i),
+          createAvroSchema(childTypeInfo), childTypeInfo.toString(),
+          (Object) null);
+      childFields.add(field);
+    }
+
+    Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(),
+        typeInfo.toString(), null, false);
+    recordSchema.setFields(childFields);
+    return recordSchema;
+  }
+
+  private static Schema createAvroMap(TypeDescription typeInfo) {
+    TypeDescription keyTypeInfo = typeInfo.getChildren().get(0);
+    if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) {
+      throw new UnsupportedOperationException("Avro only supports maps with string keys "
+          + typeInfo);
+    }
+
+    Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1));
+
+    return Schema.createMap(valueSchema);
+  }
+
+  private static Schema createAvroArray(TypeDescription typeInfo) {
+    Schema child = createAvroSchema(typeInfo.getChildren().get(0));
+    return Schema.createArray(child);
+  }
+
+  private static Schema wrapInUnionWithNull(Schema schema) {
+    Schema NULL = Schema.create(Schema.Type.NULL);
+    switch (schema.getType()) {
+      case NULL:
+        return schema;
+      case UNION:
+        List<Schema> kids = schema.getTypes();
+        List<Schema> newKids = new ArrayList<>(kids.size() + 1);
+        newKids.add(NULL);
+        return Schema.createUnion(newKids);
+      default:
+        return Schema.createUnion(Arrays.asList(NULL, schema));
+    }
+  }
+
+  private static Schema getSchemaFor(String str) {
+    Schema.Parser parser = new Schema.Parser();
+    return parser.parse(str);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
new file mode 100644
index 0000000..f9d3bad
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+public class AvroWriter {
+
+  static Properties setHiveSchema(TypeDescription schema) {
+    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+      throw new IllegalArgumentException("Assumes struct type as root, not " +
+          schema);
+    }
+    StringBuilder fieldNames = new StringBuilder();
+    StringBuilder fieldTypes = new StringBuilder();
+    List<String> childNames = schema.getFieldNames();
+    List<TypeDescription> childTypes = schema.getChildren();
+    for(int f=0; f < childNames.size(); ++f) {
+      if (f != 0) {
+        fieldNames.append(',');
+        fieldTypes.append(',');
+      }
+      fieldNames.append(childNames.get(f));
+      fieldTypes.append(childTypes.get(f).toString());
+    }
+    Properties properties = new Properties();
+    properties.put("columns", fieldNames.toString());
+    properties.put("columns.types", fieldTypes.toString());
+    return properties;
+  }
+
+  interface AvroConverter {
+    Object convert(ColumnVector vector, int row);
+  }
+
+  private static class BooleanConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        LongColumnVector vector = (LongColumnVector) cv;
+        return vector.vector[row] != 0;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class IntConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        LongColumnVector vector = (LongColumnVector) cv;
+        return (int) vector.vector[row];
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class LongConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        LongColumnVector vector = (LongColumnVector) cv;
+        return vector.vector[row];
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class FloatConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        DoubleColumnVector vector = (DoubleColumnVector) cv;
+        return (float) vector.vector[row];
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class DoubleConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        DoubleColumnVector vector = (DoubleColumnVector) cv;
+        return vector.vector[row];
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class StringConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        BytesColumnVector vector = (BytesColumnVector) cv;
+        return new String(vector.vector[row], vector.start[row],
+            vector.length[row]);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class BinaryConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        BytesColumnVector vector = (BytesColumnVector) cv;
+        return ByteBuffer.wrap(vector.vector[row], vector.start[row],
+            vector.length[row]);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class TimestampConverter implements AvroConverter {
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        TimestampColumnVector vector = (TimestampColumnVector) cv;
+        return vector.time[row];
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class DecimalConverter implements AvroConverter {
+    final int scale;
+    DecimalConverter(int scale) {
+      this.scale = scale;
+    }
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        DecimalColumnVector vector = (DecimalColumnVector) cv;
+        return getBufferFromDecimal(
+            vector.vector[row].getHiveDecimal(), scale);
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class ListConverter implements AvroConverter {
+    final Schema avroSchema;
+    final AvroConverter childConverter;
+
+    ListConverter(TypeDescription schema, Schema avroSchema) {
+      this.avroSchema = avroSchema;
+      childConverter = createConverter(schema.getChildren().get(0),
+          removeNullable(avroSchema.getElementType()));
+    }
+
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        ListColumnVector vector = (ListColumnVector) cv;
+        int offset = (int) vector.offsets[row];
+        int length = (int) vector.lengths[row];
+        GenericData.Array result = new GenericData.Array(length, avroSchema);
+        for(int i=0; i < length; ++i) {
+          result.add(childConverter.convert(vector.child, offset + i));
+        }
+        return result;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  private static class StructConverter implements AvroConverter {
+    final Schema avroSchema;
+    final AvroConverter[] childConverters;
+
+    StructConverter(TypeDescription schema, Schema avroSchema) {
+      this.avroSchema = avroSchema;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      childConverters = new AvroConverter[childrenTypes.size()];
+      List<Schema.Field> fields = avroSchema.getFields();
+      for(int f=0; f < childConverters.length; ++f) {
+        childConverters[f] = createConverter(childrenTypes.get(f),
+            removeNullable(fields.get(f).schema()));
+      }
+    }
+
+    public Object convert(ColumnVector cv, int row) {
+      if (cv.isRepeating) {
+        row = 0;
+      }
+      if (cv.noNulls || !cv.isNull[row]) {
+        StructColumnVector vector = (StructColumnVector) cv;
+        GenericData.Record result = new GenericData.Record(avroSchema);
+        for(int f=0; f < childConverters.length; ++f) {
+          result.put(f, childConverters[f].convert(vector.fields[f], row));
+        }
+        return result;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  static AvroConverter createConverter(TypeDescription types,
+                                       Schema avroSchema) {
+    switch (types.getCategory()) {
+      case BINARY:
+        return new BinaryConverter();
+      case BOOLEAN:
+        return new BooleanConverter();
+      case BYTE:
+      case SHORT:
+      case INT:
+        return new IntConverter();
+      case LONG:
+        return new LongConverter();
+      case FLOAT:
+        return new FloatConverter();
+      case DOUBLE:
+        return new DoubleConverter();
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        return new StringConverter();
+      case TIMESTAMP:
+        return new TimestampConverter();
+      case DECIMAL:
+        return new DecimalConverter(types.getScale());
+      case LIST:
+        return new ListConverter(types, avroSchema);
+      case STRUCT:
+        return new StructConverter(types, avroSchema);
+      default:
+        throw new IllegalArgumentException("Unhandled type " + types);
+    }
+  }
+
+  /**
+   * Remove the union(null, ...) wrapper around the schema.
+   *
+   * All of the types in Hive are nullable and in Avro those are represented
+   * by wrapping each type in a union type with the void type.
+   * @param avro The avro type
+   * @return The avro type with the nullable layer removed
+   */
+  static Schema removeNullable(Schema avro) {
+    while (avro.getType() == Schema.Type.UNION) {
+      List<Schema> children = avro.getTypes();
+      if (children.size() == 2 &&
+          children.get(0).getType() == Schema.Type.NULL) {
+        avro = children.get(1);
+      } else {
+        break;
+      }
+    }
+    return avro;
+  }
+
+  private final AvroConverter[] converters;
+  private final DataFileWriter writer;
+  private final GenericRecord record;
+
+  public AvroWriter(Path path, TypeDescription schema,
+                    Configuration conf,
+                    String compression) throws IOException {
+    List<TypeDescription> childTypes = schema.getChildren();
+    Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
+    List<Schema.Field> avroFields = avroSchema.getFields();
+    converters = new AvroConverter[childTypes.size()];
+    for(int c=0; c < converters.length; ++c) {
+      converters[c] = createConverter(childTypes.get(c),
+          removeNullable(avroFields.get(c).schema()));
+    }
+    GenericDatumWriter gdw = new GenericDatumWriter(avroSchema);
+    writer = new DataFileWriter(gdw);
+    if (compression != null & !"".equals(compression)) {
+      writer.setCodec(CodecFactory.fromString(compression));
+    }
+    writer.create(avroSchema, path.getFileSystem(conf).create(path));
+    record = new GenericData.Record(avroSchema);
+  }
+
+  public void writeBatch(VectorizedRowBatch batch) throws IOException {
+    for(int r=0; r < batch.size; ++r) {
+      for(int f=0; f < batch.cols.length; ++f) {
+        record.put(f, converters[f].convert(batch.cols[f], r));
+      }
+      writer.append(record);
+    }
+  }
+
+  public void close() throws IOException {
+    writer.close();
+  }
+
+  static Buffer getBufferFromBytes(byte[] input) {
+    ByteBuffer bb = ByteBuffer.wrap(input);
+    return bb.rewind();
+  }
+
+  public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
+    if (dec == null) {
+      return null;
+    }
+
+    dec = dec.setScale(scale);
+    return getBufferFromBytes(dec.unscaledValue().toByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java
new file mode 100644
index 0000000..e99ee8f
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+public class CsvReader {
+  private final Iterator<CSVRecord> parser;
+  private final ColumnReader[] readers;
+
+  interface ColumnReader {
+    void read(String value, ColumnVector vect, int row);
+  }
+
+  static class LongColumnReader implements ColumnReader {
+    public void read(String value, ColumnVector vect, int row) {
+      if ("".equals(value)) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        LongColumnVector vector = (LongColumnVector) vect;
+        vector.vector[row] = Long.parseLong(value);
+      }
+    }
+  }
+
+  static class DoubleColumnReader implements ColumnReader {
+    public void read(String value, ColumnVector vect, int row) {
+      if ("".equals(value)) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        DoubleColumnVector vector = (DoubleColumnVector) vect;
+        vector.vector[row] = Double.parseDouble(value);
+      }
+    }
+  }
+
+  static class StringColumnReader implements ColumnReader {
+    public void read(String value, ColumnVector vect, int row) {
+      if ("".equals(value)) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        BytesColumnVector vector = (BytesColumnVector) vect;
+        byte[] bytes = value.getBytes();
+        vector.setRef(row, bytes, 0, bytes.length);
+      }
+    }
+  }
+
+  static class TimestampColumnReader implements ColumnReader {
+    public void read(String value, ColumnVector vect, int row) {
+      if ("".equals(value)) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        TimestampColumnVector vector = (TimestampColumnVector) vect;
+        vector.set(row, Timestamp.valueOf(value));
+      }
+    }
+  }
+
+  static class DecimalColumnReader implements ColumnReader {
+    public void read(String value, ColumnVector vect, int row) {
+      if ("".equals(value)) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        DecimalColumnVector vector = (DecimalColumnVector) vect;
+        vector.vector[row].set(HiveDecimal.create(value));
+      }
+    }
+  }
+
+  ColumnReader createReader(TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return new LongColumnReader();
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleColumnReader();
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        return new StringColumnReader();
+      case DECIMAL:
+        return new DecimalColumnReader();
+      case TIMESTAMP:
+        return new TimestampColumnReader();
+      default:
+        throw new IllegalArgumentException("Unhandled type " + schema);
+    }
+  }
+
+  public CsvReader(Path path,
+                   Configuration conf,
+                   TypeDescription schema) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    FSDataInputStream raw = fs.open(path);
+    String name = path.getName();
+    int lastDot = name.lastIndexOf(".");
+    InputStream input = raw;
+    if (lastDot >= 0) {
+      if (".gz".equals(name.substring(lastDot))) {
+        input = new DataInputStream(new GZIPInputStream(raw));
+      }
+    }
+    parser = new CSVParser(new InputStreamReader(input),
+        CSVFormat.RFC4180.withHeader()).iterator();
+    List<TypeDescription> columnTypes = schema.getChildren();
+    readers = new ColumnReader[columnTypes.size()];
+    int c = 0;
+    for(TypeDescription columnType: columnTypes) {
+      readers[c++] = createReader(columnType);
+    }
+  }
+
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+    batch.reset();
+    int maxSize = batch.getMaxSize();
+    while (parser.hasNext() && batch.size < maxSize) {
+      CSVRecord record = parser.next();
+      int c = 0;
+      for(String val: record) {
+        readers[c].read(val, batch.cols[c], batch.size);
+        c += 1;
+      }
+      batch.size++;
+    }
+    return batch.size != 0;
+  }
+}