You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/07/17 18:25:19 UTC
orc git commit: ORC-199. Add convert from CSV.
Repository: orc
Updated Branches:
refs/heads/master 96a4ea968 -> 7e5d2cc0f
ORC-199. Add convert from CSV.
Fixes #131.
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/7e5d2cc0
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/7e5d2cc0
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/7e5d2cc0
Branch: refs/heads/master
Commit: 7e5d2cc0f9392a2434f6a3de3994c031fdd85c1b
Parents: 96a4ea9
Author: Owen O'Malley <om...@apache.org>
Authored: Tue May 23 13:54:44 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jul 17 11:23:44 2017 -0700
----------------------------------------------------------------------
java/pom.xml | 5 +
java/tools/pom.xml | 4 +
.../src/java/org/apache/orc/tools/Driver.java | 2 +-
.../apache/orc/tools/convert/ConvertTool.java | 197 +++++++++++++--
.../org/apache/orc/tools/convert/CsvReader.java | 246 +++++++++++++++++++
.../apache/orc/tools/convert/JsonReader.java | 31 +--
.../apache/orc/tools/json/JsonSchemaFinder.java | 64 +++++
.../apache/orc/tools/convert/TestCsvReader.java | 179 ++++++++++++++
8 files changed, 691 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 25c62b1..879c6f0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -342,6 +342,11 @@
<version>2.5.0</version>
</dependency>
<dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ <version>3.9</version>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.3.1</version>
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/tools/pom.xml b/java/tools/pom.xml
index 6114f07..6366e1e 100644
--- a/java/tools/pom.xml
+++ b/java/tools/pom.xml
@@ -43,6 +43,10 @@
<artifactId>gson</artifactId>
</dependency>
<dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/Driver.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/Driver.java b/java/tools/src/java/org/apache/orc/tools/Driver.java
index 9bba013..ae8f5e1 100644
--- a/java/tools/src/java/org/apache/orc/tools/Driver.java
+++ b/java/tools/src/java/org/apache/orc/tools/Driver.java
@@ -89,7 +89,7 @@ public class Driver {
System.err.println(" meta - print the metadata about the ORC file");
System.err.println(" data - print the data from the ORC file");
System.err.println(" scan - scan the ORC file");
- System.err.println(" convert - convert JSON files to ORC");
+ System.err.println(" convert - convert CSV and JSON files to ORC");
System.err.println(" json-schema - scan JSON files to determine their schema");
System.err.println();
System.err.println("To get more help, provide -h to the command");
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java b/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
index 6211bd8..33f1cf5 100644
--- a/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
+++ b/java/tools/src/java/org/apache/orc/tools/convert/ConvertTool.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,53 +18,185 @@
package org.apache.orc.tools.convert;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.tools.json.JsonSchemaFinder;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
/**
- * A conversion tool to convert JSON files into ORC files.
+ * A conversion tool to convert CSV or JSON files into ORC files.
*/
public class ConvertTool {
+ private final List<FileInformation> fileList;
+ private final TypeDescription schema;
+ private final char csvSeparator;
+ private final char csvQuote;
+ private final char csvEscape;
+ private final int csvHeaderLines;
+ private final String csvNullString;
+ private final Writer writer;
+ private final VectorizedRowBatch batch;
- static TypeDescription computeSchema(String[] filename) throws IOException {
+ TypeDescription buildSchema(List<FileInformation> files,
+ Configuration conf) throws IOException {
JsonSchemaFinder schemaFinder = new JsonSchemaFinder();
- for(String file: filename) {
- System.err.println("Scanning " + file + " for schema");
- schemaFinder.addFile(file);
+ int filesScanned = 0;
+ for(FileInformation file: files) {
+ if (file.format == Format.JSON) {
+ System.err.println("Scanning " + file.path + " for schema");
+ filesScanned += 1;
+ schemaFinder.addFile(file.getReader(file.filesystem.open(file.path)));
+ } else if (file.format == Format.ORC) {
+ System.err.println("Merging schema from " + file.path);
+ filesScanned += 1;
+ Reader reader = OrcFile.createReader(file.path,
+ OrcFile.readerOptions(conf)
+ .filesystem(file.filesystem));
+ schemaFinder.addSchema(reader.getSchema());
+ }
+ }
+ if (filesScanned == 0) {
+ throw new IllegalArgumentException("Please specify a schema using" +
+ " --schema for converting CSV files.");
}
return schemaFinder.getSchema();
}
+ enum Compression {
+ NONE, GZIP
+ }
+
+ enum Format {
+ JSON, CSV, ORC
+ }
+
+ class FileInformation {
+ private final Compression compression;
+ private final Format format;
+ private final Path path;
+ private final FileSystem filesystem;
+ private final Configuration conf;
+ private final long size;
+
+ FileInformation(Path path, Configuration conf) throws IOException {
+ this.path = path;
+ this.conf = conf;
+ this.filesystem = path.getFileSystem(conf);
+ this.size = filesystem.getFileStatus(path).getLen();
+ String name = path.getName();
+ int lastDot = name.lastIndexOf(".");
+ if (lastDot >= 0 && ".gz".equals(name.substring(lastDot))) {
+ this.compression = Compression.GZIP;
+ name = name.substring(0, lastDot);
+ lastDot = name.lastIndexOf(".");
+ } else {
+ this.compression = Compression.NONE;
+ }
+ if (lastDot >= 0) {
+ String ext = name.substring(lastDot);
+ if (".json".equals(ext) || ".jsn".equals(ext)) {
+ format = Format.JSON;
+ } else if (".csv".equals(ext)) {
+ format = Format.CSV;
+ } else if (".orc".equals(ext)) {
+ format = Format.ORC;
+ } else {
+ throw new IllegalArgumentException("Unknown kind of file " + path);
+ }
+ } else {
+ throw new IllegalArgumentException("No extension on file " + path);
+ }
+ }
+
+ java.io.Reader getReader(InputStream input) throws IOException {
+ if (compression == Compression.GZIP) {
+ input = new GZIPInputStream(input);
+ }
+ return new InputStreamReader(input, StandardCharsets.UTF_8);
+ }
+
+ public RecordReader getRecordReader() throws IOException {
+ switch (format) {
+ case ORC: {
+ Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ return reader.rows(reader.options().schema(schema));
+ }
+ case JSON: {
+ FSDataInputStream underlying = filesystem.open(path);
+ return new JsonReader(getReader(underlying), underlying, size, schema);
+ }
+ case CSV: {
+ FSDataInputStream underlying = filesystem.open(path);
+ return new CsvReader(getReader(underlying), underlying, size, schema,
+ csvSeparator, csvQuote, csvEscape, csvHeaderLines, csvNullString);
+ }
+ default:
+ throw new IllegalArgumentException("Unhandled format " + format +
+ " for " + path);
+ }
+ }
+ }
+
public static void main(Configuration conf,
String[] args) throws IOException, ParseException {
+ new ConvertTool(conf, args).run();
+ }
+
+
+ List<FileInformation> buildFileList(String[] files,
+ Configuration conf) throws IOException {
+ List<FileInformation> result = new ArrayList<>(files.length);
+ for(String fn: files) {
+ result.add(new FileInformation(new Path(fn), conf));
+ }
+ return result;
+ }
+
+ public ConvertTool(Configuration conf,
+ String[] args) throws IOException, ParseException {
CommandLine opts = parseOptions(args);
- TypeDescription schema;
+ fileList = buildFileList(opts.getArgs(), conf);
if (opts.hasOption('s')) {
- schema = TypeDescription.fromString(opts.getOptionValue('s'));
+ this.schema = TypeDescription.fromString(opts.getOptionValue('s'));
} else {
- schema = computeSchema(opts.getArgs());
+ this.schema = buildSchema(fileList, conf);
}
+ this.csvQuote = getCharOption(opts, 'q', '"');
+ this.csvEscape = getCharOption(opts, 'e', '\\');
+ this.csvSeparator = getCharOption(opts, 'S', ',');
+ this.csvHeaderLines = getIntOption(opts, 'H', 0);
+ this.csvNullString = opts.getOptionValue('n', "");
String outFilename = opts.hasOption('o')
? opts.getOptionValue('o') : "output.orc";
- Writer writer = OrcFile.createWriter(new Path(outFilename),
+ writer = OrcFile.createWriter(new Path(outFilename),
OrcFile.writerOptions(conf).setSchema(schema));
- VectorizedRowBatch batch = schema.createRowBatch();
- for (String file: opts.getArgs()) {
- System.err.println("Processing " + file);
- RecordReader reader = new JsonReader(new Path(file), schema, conf);
+ batch = schema.createRowBatch();
+ }
+
+ void run() throws IOException {
+ for (FileInformation file: fileList) {
+ System.err.println("Processing " + file.path);
+ RecordReader reader = file.getRecordReader();
while (reader.nextBatch(batch)) {
writer.addRowBatch(batch);
}
@@ -73,7 +205,23 @@ public class ConvertTool {
writer.close();
}
- static CommandLine parseOptions(String[] args) throws ParseException {
+ private static int getIntOption(CommandLine opts, char letter, int mydefault) {
+ if (opts.hasOption(letter)) {
+ return Integer.parseInt(opts.getOptionValue(letter));
+ } else {
+ return mydefault;
+ }
+ }
+
+ private static char getCharOption(CommandLine opts, char letter, char mydefault) {
+ if (opts.hasOption(letter)) {
+ return opts.getOptionValue(letter).charAt(0);
+ } else {
+ return mydefault;
+ }
+ }
+
+ private static CommandLine parseOptions(String[] args) throws ParseException {
Options options = new Options();
options.addOption(
@@ -84,7 +232,22 @@ public class ConvertTool {
options.addOption(
Option.builder("o").longOpt("output").desc("Output filename")
.hasArg().build());
- CommandLine cli = new GnuParser().parse(options, args);
+ options.addOption(
+ Option.builder("n").longOpt("null").desc("CSV null string")
+ .hasArg().build());
+ options.addOption(
+ Option.builder("q").longOpt("quote").desc("CSV quote character")
+ .hasArg().build());
+ options.addOption(
+ Option.builder("e").longOpt("escape").desc("CSV escape character")
+ .hasArg().build());
+ options.addOption(
+ Option.builder("S").longOpt("separator").desc("CSV separator character")
+ .hasArg().build());
+ options.addOption(
+ Option.builder("H").longOpt("header").desc("CSV header lines")
+ .hasArg().build());
+ CommandLine cli = new DefaultParser().parse(options, args);
if (cli.hasOption('h') || cli.getArgs().length == 0) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("convert", options);
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
new file mode 100644
index 0000000..3aa6f1a
--- /dev/null
+++ b/java/tools/src/java/org/apache/orc/tools/convert/CsvReader.java
@@ -0,0 +1,246 @@
+package org.apache.orc.tools.convert;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import com.opencsv.CSVReader;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class CsvReader implements RecordReader {
+
+ private long rowNumber = 0;
+ private final Converter converter;
+ private final int columns;
+ private final CSVReader reader;
+ private final String nullString;
+ private final FSDataInputStream underlying;
+ private final long totalSize;
+
+ /**
+ * Create a CSV reader
+ * @param reader the stream to read from
+ * @param input the underlying file that is only used for getting the
+ * position within the file
+ * @param size the number of bytes in the underlying stream
+ * @param schema the schema to read into
+ * @param separatorChar the character between fields
+ * @param quoteChar the quote character
+ * @param escapeChar the escape character
+ * @param headerLines the number of header lines
+ * @param nullString the string that is translated to null
+ * @throws IOException
+ */
+ public CsvReader(java.io.Reader reader,
+ FSDataInputStream input,
+ long size,
+ TypeDescription schema,
+ char separatorChar,
+ char quoteChar,
+ char escapeChar,
+ int headerLines,
+ String nullString) throws IOException {
+ this.underlying = input;
+ this.reader = new CSVReader(reader, separatorChar, quoteChar, escapeChar,
+ headerLines);
+ this.nullString = nullString;
+ this.totalSize = size;
+ IntWritable nextColumn = new IntWritable(0);
+ this.converter = buildConverter(nextColumn, schema);
+ this.columns = nextColumn.get();
+ }
+
+ interface Converter {
+ void convert(String[] values, VectorizedRowBatch batch, int row);
+ void convert(String[] values, ColumnVector column, int row);
+ }
+
+ @Override
+ public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+ batch.reset();
+ final int BATCH_SIZE = batch.getMaxSize();
+ String[] nextLine;
+ // Read the CSV rows and place them into the column vectors.
+ while ((nextLine = reader.readNext()) != null) {
+ rowNumber++;
+ if (nextLine.length != columns &&
+ !(nextLine.length == columns + 1 && "".equals(nextLine[columns]))) {
+ throw new IllegalArgumentException("Too many columns on line " +
+ rowNumber + ". Expected " + columns + ", but got " +
+ nextLine.length + ".");
+ }
+ converter.convert(nextLine, batch, batch.size++);
+ if (batch.size == BATCH_SIZE) {
+ break;
+ }
+ }
+ return batch.size != 0;
+ }
+
+ @Override
+ public long getRowNumber() throws IOException {
+ return rowNumber;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ long pos = underlying.getPos();
+ return totalSize != 0 && pos < totalSize ? (float) pos / totalSize : 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public void seekToRow(long rowCount) throws IOException {
+ throw new UnsupportedOperationException("Seeking not supported");
+ }
+
+ abstract class ConverterImpl implements Converter {
+ final int offset;
+
+ ConverterImpl(IntWritable offset) {
+ this.offset = offset.get();
+ offset.set(this.offset + 1);
+ }
+
+ @Override
+ public void convert(String[] values, VectorizedRowBatch batch, int row) {
+ convert(values, batch.cols[0], row);
+ }
+ }
+
+ class LongConverter extends ConverterImpl {
+ LongConverter(IntWritable offset) {
+ super(offset);
+ }
+
+ @Override
+ public void convert(String[] values, ColumnVector column, int row) {
+ if (values[offset] == null || nullString.equals(values[offset])) {
+ column.noNulls = false;
+ column.isNull[row] = true;
+ } else {
+ ((LongColumnVector) column).vector[row] =
+ Integer.parseInt(values[offset]);
+ }
+ }
+ }
+
+ class DoubleConverter extends ConverterImpl {
+ DoubleConverter(IntWritable offset) {
+ super(offset);
+ }
+
+ @Override
+ public void convert(String[] values, ColumnVector column, int row) {
+ if (values[offset] == null || nullString.equals(values[offset])) {
+ column.noNulls = false;
+ column.isNull[row] = true;
+ } else {
+ ((DoubleColumnVector) column).vector[row] =
+ Double.parseDouble(values[offset]);
+ }
+ }
+ }
+
+ class DecimalConverter extends ConverterImpl {
+ DecimalConverter(IntWritable offset) {
+ super(offset);
+ }
+
+ @Override
+ public void convert(String[] values, ColumnVector column, int row) {
+ if (values[offset] == null || nullString.equals(values[offset])) {
+ column.noNulls = false;
+ column.isNull[row] = true;
+ } else {
+ ((DecimalColumnVector) column).vector[row].set(
+ new HiveDecimalWritable(values[offset]));
+ }
+ }
+ }
+
+ class BytesConverter extends ConverterImpl {
+ BytesConverter(IntWritable offset) {
+ super(offset);
+ }
+
+ @Override
+ public void convert(String[] values, ColumnVector column, int row) {
+ if (values[offset] == null || nullString.equals(values[offset])) {
+ column.noNulls = false;
+ column.isNull[row] = true;
+ } else {
+ byte[] value = values[offset].getBytes(StandardCharsets.UTF_8);
+ ((BytesColumnVector) column).setRef(row, value, 0, value.length);
+ }
+ }
+ }
+
+ class StructConverter implements Converter {
+ final Converter[] children;
+
+
+ StructConverter(IntWritable offset, TypeDescription schema) {
+ children = new Converter[schema.getChildren().size()];
+ int c = 0;
+ for(TypeDescription child: schema.getChildren()) {
+ children[c++] = buildConverter(offset, child);
+ }
+ }
+
+ @Override
+ public void convert(String[] values, VectorizedRowBatch batch, int row) {
+ for(int c=0; c < children.length; ++c) {
+ children[c].convert(values, batch.cols[c], row);
+ }
+ }
+
+ @Override
+ public void convert(String[] values, ColumnVector column, int row) {
+ StructColumnVector cv = (StructColumnVector) column;
+ for(int c=0; c < children.length; ++c) {
+ children[c].convert(values, cv.fields[c], row);
+ }
+ }
+ }
+
+ Converter buildConverter(IntWritable startOffset, TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return new LongConverter(startOffset);
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleConverter(startOffset);
+ case DECIMAL:
+ return new DecimalConverter(startOffset);
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return new BytesConverter(startOffset);
+ case STRUCT:
+ return new StructConverter(startOffset, schema);
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
index 2cc5711..9da5e3a 100644
--- a/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
+++ b/java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -41,6 +42,7 @@ import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.List;
@@ -51,7 +53,7 @@ public class JsonReader implements RecordReader {
private final JsonStreamParser parser;
private final JsonConverter[] converters;
private final long totalSize;
- private final FSDataInputStream rawStream;
+ private final FSDataInputStream input;
private long rowNumber = 0;
interface JsonConverter {
@@ -234,26 +236,17 @@ public class JsonReader implements RecordReader {
}
}
- public JsonReader(Path path,
- TypeDescription schema,
- Configuration conf) throws IOException {
+ public JsonReader(Reader reader,
+ FSDataInputStream underlying,
+ long size,
+ TypeDescription schema) throws IOException {
this.schema = schema;
- FileSystem fs = path.getFileSystem(conf);
- totalSize = fs.getFileStatus(path).getLen();
- rawStream = fs.open(path);
- String name = path.getName();
- int lastDot = name.lastIndexOf(".");
- InputStream input = rawStream;
- if (lastDot >= 0) {
- if (".gz".equals(name.substring(lastDot))) {
- input = new GZIPInputStream(rawStream);
- }
- }
- parser = new JsonStreamParser(new InputStreamReader(input,
- StandardCharsets.UTF_8));
if (schema.getCategory() != TypeDescription.Category.STRUCT) {
throw new IllegalArgumentException("Root must be struct - " + schema);
}
+ this.input = underlying;
+ this.totalSize = size;
+ parser = new JsonStreamParser(reader);
List<TypeDescription> fieldTypes = schema.getChildren();
converters = new JsonConverter[fieldTypes.size()];
for(int c = 0; c < converters.length; ++c) {
@@ -291,12 +284,12 @@ public class JsonReader implements RecordReader {
@Override
public float getProgress() throws IOException {
- long pos = rawStream.getPos();
+ long pos = input.getPos();
return totalSize != 0 && pos < totalSize ? (float) pos / totalSize : 1;
}
public void close() throws IOException {
- rawStream.close();
+ input.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
----------------------------------------------------------------------
diff --git a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
index a426646..66254fe 100644
--- a/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
+++ b/java/tools/src/java/org/apache/orc/tools/json/JsonSchemaFinder.java
@@ -40,6 +40,7 @@ import java.io.InputStreamReader;
import java.io.PrintStream;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -246,6 +247,10 @@ public class JsonSchemaFinder {
} else {
reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
}
+ addFile(reader);
+ }
+
+ public void addFile(java.io.Reader reader) throws IOException {
JsonStreamParser parser = new JsonStreamParser(reader);
while (parser.hasNext()) {
records += 1;
@@ -253,6 +258,65 @@ public class JsonSchemaFinder {
}
}
+ HiveType makeHiveType(TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanType();
+ case BYTE:
+ return new NumericType(HiveType.Kind.BYTE, 3, 0);
+ case SHORT:
+ return new NumericType(HiveType.Kind.SHORT, 5, 0);
+ case INT:
+ return new NumericType(HiveType.Kind.INT, 10, 0);
+ case LONG:
+ return new NumericType(HiveType.Kind.LONG, 19, 0);
+ case FLOAT:
+ return new NumericType(HiveType.Kind.FLOAT, 0, 0);
+ case DOUBLE:
+ return new NumericType(HiveType.Kind.DOUBLE, 0, 0);
+ case DECIMAL: {
+ int scale = schema.getScale();
+ int intDigits = schema.getPrecision() - scale;
+ return new NumericType(HiveType.Kind.DECIMAL, intDigits, scale);
+ }
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return new StringType(HiveType.Kind.STRING);
+ case TIMESTAMP:
+ return new StringType(HiveType.Kind.TIMESTAMP);
+ case DATE:
+ return new StringType(HiveType.Kind.DATE);
+ case BINARY:
+ return new StringType(HiveType.Kind.BINARY);
+ case LIST:
+ return new ListType(makeHiveType(schema.getChildren().get(0)));
+ case STRUCT: {
+ StructType result = new StructType();
+ List<String> fields = schema.getFieldNames();
+ List<TypeDescription> children = schema.getChildren();
+ for(int i = 0; i < fields.size(); ++i) {
+ result.addField(fields.get(i), makeHiveType(children.get(i)));
+ }
+ return result;
+ }
+ case UNION: {
+ UnionType result = new UnionType();
+ for(TypeDescription child: schema.getChildren()) {
+ result.addType(makeHiveType(child));
+ }
+ return result;
+ }
+ case MAP:
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+
+ public void addSchema(TypeDescription schema) {
+ mergedType = mergeType(mergedType, makeHiveType(schema));
+ }
+
public TypeDescription getSchema() {
return mergedType.getSchema();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/7e5d2cc0/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java b/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java
new file mode 100644
index 0000000..c70dcd7
--- /dev/null
+++ b/java/tools/src/test/org/apache/orc/tools/convert/TestCsvReader.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.tools.convert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.tools.FileDump;
+import org.apache.orc.tools.TestJsonFileDump;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.PrintStream;
+import java.io.StringReader;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+public class TestCsvReader {
+
+ Configuration conf;
+
+ @Before
+ public void openFileSystem () throws Exception {
+ conf = new Configuration();
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ StringReader input = new StringReader(
+ "1,1.25,1.01,'a'\n" +
+ "2,2.5,2.02,'14'\n" +
+ "3,3.75,3.03,'1e'\n" +
+ "4,5,4.04,'28'\n" +
+ "5,6.25,5.05,'32'\n" +
+ "6,7.5,6.06,'3c'\n" +
+ "7,8.75,7.07,'46'\n" +
+ "8,10,8.08,'50'\n"
+ );
+ TypeDescription schema = TypeDescription.fromString(
+ "struct<a:int,b:double,c:decimal(10,2),d:string>");
+ RecordReader reader = new CsvReader(input, null, 1, schema, ',', '\'',
+ '\\', 0, "");
+ VectorizedRowBatch batch = schema.createRowBatch(5);
+ assertEquals(true, reader.nextBatch(batch));
+ assertEquals(5, batch.size);
+ for(int r = 0; r < batch.size; ++r) {
+ assertEquals(r+1, ((LongColumnVector) batch.cols[0]).vector[r]);
+ assertEquals(1.25 * (r + 1), ((DoubleColumnVector) batch.cols[1]).vector[r], 0.001);
+ assertEquals((r + 1) + ".0" + (r + 1), ((DecimalColumnVector) batch.cols[2]).vector[r].toFormatString(2));
+ assertEquals(Integer.toHexString((r + 1) * 10), ((BytesColumnVector) batch.cols[3]).toString(r));
+ }
+ assertEquals(true, reader.nextBatch(batch));
+ assertEquals(3, batch.size);
+ for(int r = 0; r < batch.size; ++r) {
+ assertEquals(r + 6, ((LongColumnVector) batch.cols[0]).vector[r]);
+ assertEquals(1.25 * (r + 6), ((DoubleColumnVector) batch.cols[1]).vector[r], 0.001);
+ assertEquals((r + 6) + ".0" + (r + 6), ((DecimalColumnVector) batch.cols[2]).vector[r].toFormatString(2));
+ assertEquals(Integer.toHexString((r + 6) * 10), ((BytesColumnVector) batch.cols[3]).toString(r));
+ }
+ assertEquals(false, reader.nextBatch(batch));
+ }
+
+ @Test
+ public void testNulls() throws Exception {
+ StringReader input = new StringReader(
+ "1,1,1,'a'\n" +
+ "'null','null','null','null'\n" +
+ "3,3,3,'row 3'\n"
+ );
+ TypeDescription schema = TypeDescription.fromString(
+ "struct<a:int,b:double,c:decimal(10,2),d:string>");
+ RecordReader reader = new CsvReader(input, null, 1, schema, ',', '\'',
+ '\\', 0, "null");
+ VectorizedRowBatch batch = schema.createRowBatch();
+ assertEquals(true, reader.nextBatch(batch));
+ assertEquals(3, batch.size);
+ for(int c=0; c < 4; ++c) {
+ assertEquals("column " + c, false, batch.cols[c].noNulls);
+ }
+
+ // check row 0
+ assertEquals(1, ((LongColumnVector) batch.cols[0]).vector[0]);
+ assertEquals(1, ((DoubleColumnVector) batch.cols[1]).vector[0], 0.001);
+ assertEquals("1", ((DecimalColumnVector) batch.cols[2]).vector[0].toString());
+ assertEquals("a", ((BytesColumnVector) batch.cols[3]).toString(0));
+ for(int c=0; c < 4; ++c) {
+ assertEquals("column " + c, false, batch.cols[c].isNull[0]);
+ }
+
+ // row 1
+ for(int c=0; c < 4; ++c) {
+ assertEquals("column " + c, true, batch.cols[c].isNull[1]);
+ }
+
+ // check row 2
+ assertEquals(3, ((LongColumnVector) batch.cols[0]).vector[2]);
+ assertEquals(3, ((DoubleColumnVector) batch.cols[1]).vector[2], 0.001);
+ assertEquals("3", ((DecimalColumnVector) batch.cols[2]).vector[2].toString());
+ assertEquals("row 3", ((BytesColumnVector) batch.cols[3]).toString(2));
+ for(int c=0; c < 4; ++c) {
+ assertEquals("column " + c, false, batch.cols[c].isNull[2]);
+ }
+ }
+
+ @Test
+ public void testStructs() throws Exception {
+ StringReader input = new StringReader(
+ "1,2,3,4\n" +
+ "5,6,7,8\n"
+ );
+ TypeDescription schema = TypeDescription.fromString(
+ "struct<a:int,b:struct<c:int,d:int>,e:int>");
+ RecordReader reader = new CsvReader(input, null, 1, schema, ',', '\'',
+ '\\', 0, "null");
+ VectorizedRowBatch batch = schema.createRowBatch();
+ assertEquals(true, reader.nextBatch(batch));
+ assertEquals(2, batch.size);
+ int nextVal = 1;
+ for(int r=0; r < 2; ++r) {
+ assertEquals("row " + r, nextVal++, ((LongColumnVector) batch.cols[0]).vector[r]);
+ StructColumnVector b = (StructColumnVector) batch.cols[1];
+ assertEquals("row " + r, nextVal++, ((LongColumnVector) b.fields[0]).vector[r]);
+ assertEquals("row " + r, nextVal++, ((LongColumnVector) b.fields[1]).vector[r]);
+ assertEquals("row " + r, nextVal++, ((LongColumnVector) batch.cols[2]).vector[r]);
+ }
+ assertEquals(false, reader.nextBatch(batch));
+ }
+}