You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/07/17 18:25:19 UTC

orc git commit: ORC-199. Add convert from CSV.

Repository: orc
Updated Branches:
  refs/heads/master 96a4ea968 -> 7e5d2cc0f


ORC-199. Add convert from CSV.

Fixes #131.

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/7e5d2cc0
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/7e5d2cc0
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/7e5d2cc0

Branch: refs/heads/master
Commit: 7e5d2cc0f9392a2434f6a3de3994c031fdd85c1b
Parents: 96a4ea9
Author: Owen O'Malley <om...@apache.org>
Authored: Tue May 23 13:54:44 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jul 17 11:23:44 2017 -0700

----------------------------------------------------------------------
 java/pom.xml                                    |   5 +
 java/tools/pom.xml                              |   4 +
 .../src/java/org/apache/orc/tools/Driver.java   |   2 +-
 .../apache/orc/tools/convert/ConvertTool.java   | 197 +++++++++++++--
 .../org/apache/orc/tools/convert/CsvReader.java | 246 +++++++++++++++++++
 .../apache/orc/tools/convert/JsonReader.java    |  31 +--
 .../apache/orc/tools/json/JsonSchemaFinder.java |  64 +++++
 .../apache/orc/tools/convert/TestCsvReader.java | 179 ++++++++++++++
 8 files changed, 691 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 25c62b1..879c6f0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -342,6 +342,11 @@
         <version>2.5.0</version>
       </dependency>
       <dependency>
+        <groupId>com.opencsv</groupId>
+        <artifactId>opencsv</artifactId>
+        <version>3.9</version>
+      </dependency>
+      <dependency>
         <groupId>commons-cli</groupId>
         <artifactId>commons-cli</artifactId>
         <version>1.3.1</version>

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/tools/pom.xml b/java/tools/pom.xml
index 6114f07..6366e1e 100644
--- a/java/tools/pom.xml
+++ b/java/tools/pom.xml
@@ -43,6 +43,10 @@
       <artifactId>gson</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.opencsv</groupId>
+      <artifactId>opencsv</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/Driver.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/Driver.java b/java/tools/src/java/org/apache/orc/tools/Driver.java
index 9bba013..ae8f5e1 100644
--- a/java/tools/src/java/org/apache/orc/tools/Driver.java
+++ b/java/tools/src/java/org/apache/orc/tools/Driver.java
@@ -89,7 +89,7 @@ public class Driver {
       System.err.println("   meta - print the metadata about the ORC file");
       System.err.println("   data - print the data from the ORC file");
       System.err.println("   scan - scan the ORC file");
-      System.err.println("   convert - convert JSON files to ORC");
+      System.err.println("   convert - convert CSV and JSON files to ORC");
       System.err.println("   json-schema - scan JSON files to determine their schema");
       System.err.println();
       System.err.println("To get more help, provide -h to the command");

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java b/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
index 6211bd8..33f1cf5 100644
--- a/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
+++ b/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,53 +18,185 @@
 package org.apache.orc.tools.convert;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 import org.apache.orc.tools.json.JsonSchemaFinder;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
 
 /**
- * A conversion tool to convert JSON files into ORC files.
+ * A conversion tool to convert CSV or JSON files into ORC files.
  */
 public class ConvertTool {
+  private final List<FileInformation> fileList;
+  private final TypeDescription schema;
+  private final char csvSeparator;
+  private final char csvQuote;
+  private final char csvEscape;
+  private final int csvHeaderLines;
+  private final String csvNullString;
+  private final Writer writer;
+  private final VectorizedRowBatch batch;
 
-  static TypeDescription computeSchema(String[] filename) throws IOException {
+  TypeDescription buildSchema(List<FileInformation> files,
+                              Configuration conf) throws IOException {
     JsonSchemaFinder schemaFinder = new JsonSchemaFinder();
-    for(String file: filename) {
-      System.err.println("Scanning " + file + " for schema");
-      schemaFinder.addFile(file);
+    int filesScanned = 0;
+    for(FileInformation file: files) {
+      if (file.format == Format.JSON) {
+        System.err.println("Scanning " + file.path + " for schema");
+        filesScanned += 1;
+        schemaFinder.addFile(file.getReader(file.filesystem.open(file.path)));
+      } else if (file.format == Format.ORC) {
+        System.err.println("Merging schema from " + file.path);
+        filesScanned += 1;
+        Reader reader = OrcFile.createReader(file.path,
+            OrcFile.readerOptions(conf)
+                .filesystem(file.filesystem));
+        schemaFinder.addSchema(reader.getSchema());
+      }
+    }
+    if (filesScanned == 0) {
+      throw new IllegalArgumentException("Please specify a schema using" +
+          " --schema for converting CSV files.");
     }
     return schemaFinder.getSchema();
   }
 
+  enum Compression {
+    NONE, GZIP
+  }
+
+  enum Format {
+    JSON, CSV, ORC
+  }
+
+  class FileInformation {
+    private final Compression compression;
+    private final Format format;
+    private final Path path;
+    private final FileSystem filesystem;
+    private final Configuration conf;
+    private final long size;
+
+    FileInformation(Path path, Configuration conf) throws IOException {
+      this.path = path;
+      this.conf = conf;
+      this.filesystem = path.getFileSystem(conf);
+      this.size = filesystem.getFileStatus(path).getLen();
+      String name = path.getName();
+      int lastDot = name.lastIndexOf(".");
+      if (lastDot >= 0 && ".gz".equals(name.substring(lastDot))) {
+        this.compression = Compression.GZIP;
+        name = name.substring(0, lastDot);
+        lastDot = name.lastIndexOf(".");
+      } else {
+        this.compression = Compression.NONE;
+      }
+      if (lastDot >= 0) {
+        String ext = name.substring(lastDot);
+        if (".json".equals(ext) || ".jsn".equals(ext)) {
+          format = Format.JSON;
+        } else if (".csv".equals(ext)) {
+          format = Format.CSV;
+        } else if (".orc".equals(ext)) {
+          format = Format.ORC;
+        } else {
+          throw new IllegalArgumentException("Unknown kind of file " + path);
+        }
+      } else {
+        throw new IllegalArgumentException("No extension on file " + path);
+      }
+    }
+
+    java.io.Reader getReader(InputStream input) throws IOException {
+      if (compression == Compression.GZIP) {
+        input = new GZIPInputStream(input);
+      }
+      return new InputStreamReader(input, StandardCharsets.UTF_8);
+    }
+
+    public RecordReader getRecordReader() throws IOException {
+      switch (format) {
+        case ORC: {
+          Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+          return reader.rows(reader.options().schema(schema));
+        }
+        case JSON: {
+          FSDataInputStream underlying = filesystem.open(path);
+          return new JsonReader(getReader(underlying), underlying, size, schema);
+        }
+        case CSV: {
+          FSDataInputStream underlying = filesystem.open(path);
+          return new CsvReader(getReader(underlying), underlying, size, schema,
+              csvSeparator, csvQuote, csvEscape, csvHeaderLines, csvNullString);
+        }
+        default:
+          throw new IllegalArgumentException("Unhandled format " + format +
+              " for " + path);
+      }
+    }
+  }
+
   public static void main(Configuration conf,
                           String[] args) throws IOException, ParseException {
+    new ConvertTool(conf, args).run();
+  }
+
+
+  List<FileInformation> buildFileList(String[] files,
+                                      Configuration conf) throws IOException {
+    List<FileInformation> result = new ArrayList<>(files.length);
+    for(String fn: files) {
+      result.add(new FileInformation(new Path(fn), conf));
+    }
+    return result;
+  }
+
+  public ConvertTool(Configuration conf,
+                     String[] args) throws IOException, ParseException {
     CommandLine opts = parseOptions(args);
-    TypeDescription schema;
+    fileList = buildFileList(opts.getArgs(), conf);
     if (opts.hasOption('s')) {
-      schema = TypeDescription.fromString(opts.getOptionValue('s'));
+      this.schema = TypeDescription.fromString(opts.getOptionValue('s'));
     } else {
-      schema = computeSchema(opts.getArgs());
+      this.schema = buildSchema(fileList, conf);
     }
+    this.csvQuote = getCharOption(opts, 'q', '"');
+    this.csvEscape = getCharOption(opts, 'e', '\\');
+    this.csvSeparator = getCharOption(opts, 'S', ',');
+    this.csvHeaderLines = getIntOption(opts, 'H', 0);
+    this.csvNullString = opts.getOptionValue('n', "");
     String outFilename = opts.hasOption('o')
         ? opts.getOptionValue('o') : "output.orc";
-    Writer writer = OrcFile.createWriter(new Path(outFilename),
+    writer = OrcFile.createWriter(new Path(outFilename),
         OrcFile.writerOptions(conf).setSchema(schema));
-    VectorizedRowBatch batch = schema.createRowBatch();
-    for (String file: opts.getArgs()) {
-      System.err.println("Processing " + file);
-      RecordReader reader = new JsonReader(new Path(file), schema, conf);
+    batch = schema.createRowBatch();
+  }
+
+  void run() throws IOException {
+    for (FileInformation file: fileList) {
+      System.err.println("Processing " + file.path);
+      RecordReader reader = file.getRecordReader();
       while (reader.nextBatch(batch)) {
         writer.addRowBatch(batch);
       }
@@ -73,7 +205,23 @@ public class ConvertTool {
     writer.close();
   }
 
-  static CommandLine parseOptions(String[] args) throws ParseException {
+  private static int getIntOption(CommandLine opts, char letter, int mydefault) {
+    if (opts.hasOption(letter)) {
+      return Integer.parseInt(opts.getOptionValue(letter));
+    } else {
+      return mydefault;
+    }
+  }
+
+  private static char getCharOption(CommandLine opts, char letter, char mydefault) {
+    if (opts.hasOption(letter)) {
+      return opts.getOptionValue(letter).charAt(0);
+    } else {
+      return mydefault;
+    }
+  }
+
+  private static CommandLine parseOptions(String[] args) throws ParseException {
     Options options = new Options();
 
     options.addOption(
@@ -84,7 +232,22 @@ public class ConvertTool {
     options.addOption(
         Option.builder("o").longOpt("output").desc("Output filename")
             .hasArg().build());
-    CommandLine cli = new GnuParser().parse(options, args);
+    options.addOption(
+        Option.builder("n").longOpt("null").desc("CSV null string")
+            .hasArg().build());
+    options.addOption(
+        Option.builder("q").longOpt("quote").desc("CSV quote character")
+            .hasArg().build());
+    options.addOption(
+        Option.builder("e").longOpt("escape").desc("CSV escape character")
+            .hasArg().build());
+    options.addOption(
+        Option.builder("S").longOpt("separator").desc("CSV separator character")
+            .hasArg().build());
+    options.addOption(
+        Option.builder("H").longOpt("header").desc("CSV header lines")
+            .hasArg().build());
+    CommandLine cli = new DefaultParser().parse(options, args);
     if (cli.hasOption('h') || cli.getArgs().length == 0) {
       HelpFormatter formatter = new HelpFormatter();
       formatter.printHelp("convert", options);

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
new file mode 100644
index 0000000..3aa6f1a
--- /dev/null
+++ b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
@@ -0,0 +1,246 @@
+package org.apache.orc.tools.convert;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import com.opencsv.CSVReader;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class CsvReader implements RecordReader {
+
+  private long rowNumber = 0;
+  private final Converter converter;
+  private final int columns;
+  private final CSVReader reader;
+  private final String nullString;
+  private final FSDataInputStream underlying;
+  private final long totalSize;
+
+  /**
+   * Create a CSV reader
+   * @param reader the stream to read from
+   * @param input the underlying file that is only used for getting the
+   *              position within the file
+   * @param size the number of bytes in the underlying stream
+   * @param schema the schema to read into
+   * @param separatorChar the character between fields
+   * @param quoteChar the quote character
+   * @param escapeChar the escape character
+   * @param headerLines the number of header lines
+   * @param nullString the string that is translated to null
+   * @throws IOException
+   */
+  public CsvReader(java.io.Reader reader,
+                   FSDataInputStream input,
+                   long size,
+                   TypeDescription schema,
+                   char separatorChar,
+                   char quoteChar,
+                   char escapeChar,
+                   int headerLines,
+                   String nullString) throws IOException {
+    this.underlying = input;
+    this.reader = new CSVReader(reader, separatorChar, quoteChar, escapeChar,
+        headerLines);
+    this.nullString = nullString;
+    this.totalSize = size;
+    IntWritable nextColumn = new IntWritable(0);
+    this.converter = buildConverter(nextColumn, schema);
+    this.columns = nextColumn.get();
+  }
+
+  interface Converter {
+    void convert(String[] values, VectorizedRowBatch batch, int row);
+    void convert(String[] values, ColumnVector column, int row);
+  }
+
+  @Override
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+    batch.reset();
+    final int BATCH_SIZE = batch.getMaxSize();
+    String[] nextLine;
+    // Read the CSV rows and place them into the column vectors.
+    while ((nextLine = reader.readNext()) != null) {
+      rowNumber++;
+      if (nextLine.length != columns &&
+          !(nextLine.length == columns + 1 && "".equals(nextLine[columns]))) {
+        throw new IllegalArgumentException("Too many columns on line " +
+            rowNumber + ". Expected " + columns + ", but got " +
+            nextLine.length + ".");
+      }
+      converter.convert(nextLine, batch, batch.size++);
+      if (batch.size == BATCH_SIZE) {
+        break;
+      }
+    }
+    return batch.size != 0;
+  }
+
+  @Override
+  public long getRowNumber() throws IOException {
+    return rowNumber;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    long pos = underlying.getPos();
+    return totalSize != 0 && pos < totalSize ? (float) pos / totalSize : 1;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  @Override
+  public void seekToRow(long rowCount) throws IOException {
+    throw new UnsupportedOperationException("Seeking not supported");
+  }
+
+  abstract class ConverterImpl implements Converter {
+    final int offset;
+
+    ConverterImpl(IntWritable offset) {
+      this.offset = offset.get();
+      offset.set(this.offset + 1);
+    }
+
+    @Override
+    public void convert(String[] values, VectorizedRowBatch batch, int row) {
+      convert(values, batch.cols[0], row);
+    }
+  }
+
+  class LongConverter extends ConverterImpl {
+    LongConverter(IntWritable offset) {
+      super(offset);
+    }
+
+    @Override
+    public void convert(String[] values, ColumnVector column, int row) {
+      if (values[offset] == null || nullString.equals(values[offset])) {
+        column.noNulls = false;
+        column.isNull[row] = true;
+      } else {
+        ((LongColumnVector) column).vector[row] =
+            Integer.parseInt(values[offset]);
+      }
+    }
+  }
+
+  class DoubleConverter extends ConverterImpl {
+    DoubleConverter(IntWritable offset) {
+      super(offset);
+    }
+
+    @Override
+    public void convert(String[] values, ColumnVector column, int row) {
+      if (values[offset] == null || nullString.equals(values[offset])) {
+        column.noNulls = false;
+        column.isNull[row] = true;
+      } else {
+        ((DoubleColumnVector) column).vector[row] =
+            Double.parseDouble(values[offset]);
+      }
+    }
+  }
+
+  class DecimalConverter extends ConverterImpl {
+    DecimalConverter(IntWritable offset) {
+      super(offset);
+    }
+
+    @Override
+    public void convert(String[] values, ColumnVector column, int row) {
+      if (values[offset] == null || nullString.equals(values[offset])) {
+        column.noNulls = false;
+        column.isNull[row] = true;
+      } else {
+        ((DecimalColumnVector) column).vector[row].set(
+            new HiveDecimalWritable(values[offset]));
+      }
+    }
+  }
+
+  class BytesConverter extends ConverterImpl {
+    BytesConverter(IntWritable offset) {
+      super(offset);
+    }
+
+    @Override
+    public void convert(String[] values, ColumnVector column, int row) {
+      if (values[offset] == null || nullString.equals(values[offset])) {
+        column.noNulls = false;
+        column.isNull[row] = true;
+      } else {
+        byte[] value = values[offset].getBytes(StandardCharsets.UTF_8);
+        ((BytesColumnVector) column).setRef(row, value, 0, value.length);
+      }
+    }
+  }
+
+  class StructConverter implements Converter {
+    final Converter[] children;
+
+
+    StructConverter(IntWritable offset, TypeDescription schema) {
+      children = new Converter[schema.getChildren().size()];
+      int c = 0;
+      for(TypeDescription child: schema.getChildren()) {
+        children[c++] = buildConverter(offset, child);
+      }
+    }
+
+    @Override
+    public void convert(String[] values, VectorizedRowBatch batch, int row) {
+      for(int c=0; c < children.length; ++c) {
+        children[c].convert(values, batch.cols[c], row);
+      }
+    }
+
+    @Override
+    public void convert(String[] values, ColumnVector column, int row) {
+      StructColumnVector cv = (StructColumnVector) column;
+      for(int c=0; c < children.length; ++c) {
+        children[c].convert(values, cv.fields[c], row);
+      }
+    }
+  }
+
+  Converter buildConverter(IntWritable startOffset, TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return new LongConverter(startOffset);
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleConverter(startOffset);
+      case DECIMAL:
+        return new DecimalConverter(startOffset);
+      case BINARY:
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        return new BytesConverter(startOffset);
+      case STRUCT:
+        return new StructConverter(startOffset, schema);
+      default:
+        throw new IllegalArgumentException("Unhandled type " + schema);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
index 2cc5711..9da5e3a 100644
--- a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
+++ b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -41,6 +42,7 @@ import org.apache.orc.TypeDescription;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.Reader;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.util.List;
@@ -51,7 +53,7 @@ public class JsonReader implements RecordReader {
   private final JsonStreamParser parser;
   private final JsonConverter[] converters;
   private final long totalSize;
-  private final FSDataInputStream rawStream;
+  private final FSDataInputStream input;
   private long rowNumber = 0;
 
   interface JsonConverter {
@@ -234,26 +236,17 @@ public class JsonReader implements RecordReader {
     }
   }
 
-  public JsonReader(Path path,
-                    TypeDescription schema,
-                    Configuration conf) throws IOException {
+  public JsonReader(Reader reader,
+                    FSDataInputStream underlying,
+                    long size,
+                    TypeDescription schema) throws IOException {
     this.schema = schema;
-    FileSystem fs = path.getFileSystem(conf);
-    totalSize = fs.getFileStatus(path).getLen();
-    rawStream = fs.open(path);
-    String name = path.getName();
-    int lastDot = name.lastIndexOf(".");
-    InputStream input = rawStream;
-    if (lastDot >= 0) {
-      if (".gz".equals(name.substring(lastDot))) {
-        input = new GZIPInputStream(rawStream);
-      }
-    }
-    parser = new JsonStreamParser(new InputStreamReader(input,
-        StandardCharsets.UTF_8));
     if (schema.getCategory() != TypeDescription.Category.STRUCT) {
       throw new IllegalArgumentException("Root must be struct - " + schema);
     }
+    this.input = underlying;
+    this.totalSize = size;
+    parser = new JsonStreamParser(reader);
     List<TypeDescription> fieldTypes = schema.getChildren();
     converters = new JsonConverter[fieldTypes.size()];
     for(int c = 0; c < converters.length; ++c) {
@@ -291,12 +284,12 @@ public class JsonReader implements RecordReader {
 
   @Override
   public float getProgress() throws IOException {
-    long pos = rawStream.getPos();
+    long pos = input.getPos();
     return totalSize != 0 && pos < totalSize ? (float) pos / totalSize : 1;
   }
 
   public void close() throws IOException {
-    rawStream.close();
+    input.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
index a426646..66254fe 100644
--- a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
+++ b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
@@ -40,6 +40,7 @@ import java.io.InputStreamReader;
 import java.io.PrintStream;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -246,6 +247,10 @@ public class JsonSchemaFinder {
     } else {
       reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
     }
+    addFile(reader);
+  }
+
+  public void addFile(java.io.Reader reader) throws IOException {
     JsonStreamParser parser = new JsonStreamParser(reader);
     while (parser.hasNext()) {
       records += 1;
@@ -253,6 +258,65 @@ public class JsonSchemaFinder {
     }
   }
 
+  HiveType makeHiveType(TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return new BooleanType();
+      case BYTE:
+        return new NumericType(HiveType.Kind.BYTE, 3, 0);
+      case SHORT:
+        return new NumericType(HiveType.Kind.SHORT, 5, 0);
+      case INT:
+        return new NumericType(HiveType.Kind.INT, 10, 0);
+      case LONG:
+        return new NumericType(HiveType.Kind.LONG, 19, 0);
+      case FLOAT:
+        return new NumericType(HiveType.Kind.FLOAT, 0, 0);
+      case DOUBLE:
+        return new NumericType(HiveType.Kind.DOUBLE, 0, 0);
+      case DECIMAL: {
+        int scale = schema.getScale();
+        int intDigits = schema.getPrecision() - scale;
+        return new NumericType(HiveType.Kind.DECIMAL, intDigits, scale);
+      }
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        return new StringType(HiveType.Kind.STRING);
+      case TIMESTAMP:
+        return new StringType(HiveType.Kind.TIMESTAMP);
+      case DATE:
+        return new StringType(HiveType.Kind.DATE);
+      case BINARY:
+        return new StringType(HiveType.Kind.BINARY);
+      case LIST:
+        return new ListType(makeHiveType(schema.getChildren().get(0)));
+      case STRUCT: {
+        StructType result = new StructType();
+        List<String> fields = schema.getFieldNames();
+        List<TypeDescription> children = schema.getChildren();
+        for(int i = 0; i < fields.size(); ++i) {
+          result.addField(fields.get(i), makeHiveType(children.get(i)));
+        }
+        return result;
+      }
+      case UNION: {
+        UnionType result = new UnionType();
+        for(TypeDescription child: schema.getChildren()) {
+          result.addType(makeHiveType(child));
+        }
+        return result;
+      }
+      case MAP:
+      default:
+        throw new IllegalArgumentException("Unhandled type " + schema);
+    }
+  }
+
+  public void addSchema(TypeDescription schema) {
+    mergedType = mergeType(mergedType, makeHiveType(schema));
+  }
+
   public TypeDescription getSchema() {
     return mergedType.getSchema();
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java b/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java
new file mode 100644
index 0000000..c70dcd7
--- /dev/null
+++ b/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.tools.convert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.tools.FileDump;
+import org.apache.orc.tools.TestJsonFileDump;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+public class TestCsvReader {
+
+  Configuration conf;
+
+  @Before
+  public void openFileSystem () throws Exception {
+    conf = new Configuration();
+  }
+
+  @Test
+  public void testSimple() throws Exception {
+    StringReader input = new StringReader(
+        "1,1.25,1.01,'a'\n" +
+        "2,2.5,2.02,'14'\n" +
+        "3,3.75,3.03,'1e'\n" +
+        "4,5,4.04,'28'\n" +
+        "5,6.25,5.05,'32'\n" +
+        "6,7.5,6.06,'3c'\n" +
+        "7,8.75,7.07,'46'\n" +
+        "8,10,8.08,'50'\n"
+    );
+    TypeDescription schema = TypeDescription.fromString(
+        "struct<a:int,b:double,c:decimal(10,2),d:string>");
+    RecordReader reader = new CsvReader(input, null, 1, schema, ',', '\'',
+        '\\', 0, "");
+    VectorizedRowBatch batch = schema.createRowBatch(5);
+    assertEquals(true, reader.nextBatch(batch));
+    assertEquals(5, batch.size);
+    for(int r = 0; r < batch.size; ++r) {
+      assertEquals(r+1, ((LongColumnVector) batch.cols[0]).vector[r]);
+      assertEquals(1.25 * (r + 1), ((DoubleColumnVector) batch.cols[1]).vector[r], 0.001);
+      assertEquals((r + 1) + ".0" + (r + 1), ((DecimalColumnVector) batch.cols[2]).vector[r].toFormatString(2));
+      assertEquals(Integer.toHexString((r + 1) * 10), ((BytesColumnVector) batch.cols[3]).toString(r));
+    }
+    assertEquals(true, reader.nextBatch(batch));
+    assertEquals(3, batch.size);
+    for(int r = 0; r < batch.size; ++r) {
+      assertEquals(r + 6, ((LongColumnVector) batch.cols[0]).vector[r]);
+      assertEquals(1.25 * (r + 6), ((DoubleColumnVector) batch.cols[1]).vector[r], 0.001);
+      assertEquals((r + 6) + ".0" + (r + 6), ((DecimalColumnVector) batch.cols[2]).vector[r].toFormatString(2));
+      assertEquals(Integer.toHexString((r + 6) * 10), ((BytesColumnVector) batch.cols[3]).toString(r));
+    }
+    assertEquals(false, reader.nextBatch(batch));
+  }
+
+  @Test
+  public void testNulls() throws Exception {
+    StringReader input = new StringReader(
+        "1,1,1,'a'\n" +
+        "'null','null','null','null'\n" +
+        "3,3,3,'row 3'\n"
+    );
+    TypeDescription schema = TypeDescription.fromString(
+        "struct<a:int,b:double,c:decimal(10,2),d:string>");
+    RecordReader reader = new CsvReader(input, null, 1, schema, ',', '\'',
+        '\\', 0, "null");
+    VectorizedRowBatch batch = schema.createRowBatch();
+    assertEquals(true, reader.nextBatch(batch));
+    assertEquals(3, batch.size);
+    for(int c=0; c < 4; ++c) {
+      assertEquals("column " + c, false, batch.cols[c].noNulls);
+    }
+
+    // check row 0
+    assertEquals(1, ((LongColumnVector) batch.cols[0]).vector[0]);
+    assertEquals(1, ((DoubleColumnVector) batch.cols[1]).vector[0], 0.001);
+    assertEquals("1", ((DecimalColumnVector) batch.cols[2]).vector[0].toString());
+    assertEquals("a", ((BytesColumnVector) batch.cols[3]).toString(0));
+    for(int c=0; c < 4; ++c) {
+      assertEquals("column " + c, false, batch.cols[c].isNull[0]);
+    }
+
+    // row 1
+    for(int c=0; c < 4; ++c) {
+      assertEquals("column " + c, true, batch.cols[c].isNull[1]);
+    }
+
+    // check row 2
+    assertEquals(3, ((LongColumnVector) batch.cols[0]).vector[2]);
+    assertEquals(3, ((DoubleColumnVector) batch.cols[1]).vector[2], 0.001);
+    assertEquals("3", ((DecimalColumnVector) batch.cols[2]).vector[2].toString());
+    assertEquals("row 3", ((BytesColumnVector) batch.cols[3]).toString(2));
+    for(int c=0; c < 4; ++c) {
+      assertEquals("column " + c, false, batch.cols[c].isNull[2]);
+    }
+  }
+
+  @Test
+  public void testStructs() throws Exception {
+    StringReader input = new StringReader(
+        "1,2,3,4\n" +
+        "5,6,7,8\n"
+    );
+    TypeDescription schema = TypeDescription.fromString(
+        "struct<a:int,b:struct<c:int,d:int>,e:int>");
+    RecordReader reader = new CsvReader(input, null, 1, schema, ',', '\'',
+        '\\', 0, "null");
+    VectorizedRowBatch batch = schema.createRowBatch();
+    assertEquals(true, reader.nextBatch(batch));
+    assertEquals(2, batch.size);
+    int nextVal = 1;
+    for(int r=0; r < 2; ++r) {
+      assertEquals("row " + r, nextVal++, ((LongColumnVector) batch.cols[0]).vector[r]);
+      StructColumnVector b = (StructColumnVector) batch.cols[1];
+      assertEquals("row " + r, nextVal++, ((LongColumnVector) b.fields[0]).vector[r]);
+      assertEquals("row " + r, nextVal++, ((LongColumnVector) b.fields[1]).vector[r]);
+      assertEquals("row " + r, nextVal++, ((LongColumnVector) batch.cols[2]).vector[r]);
+    }
+    assertEquals(false, reader.nextBatch(batch));
+  }
+}