You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:11 UTC
[09/15] orc git commit: more updates
more updates
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/5b37113b
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/5b37113b
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/5b37113b
Branch: refs/heads/orc-72
Commit: 5b37113b73eb0e12744f2711326e11cd2ef6eaef
Parents: 86628bc
Author: Owen O'Malley <om...@apache.org>
Authored: Mon Oct 3 10:01:40 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Oct 10 13:59:16 2016 -0700
----------------------------------------------------------------------
java/bench/pom.xml | 4 +
.../src/java/org/apache/orc/bench/AvroScan.java | 47 ---
.../org/apache/orc/bench/AvroSchemaUtils.java | 190 ----------
.../java/org/apache/orc/bench/AvroWriter.java | 375 -------------------
.../java/org/apache/orc/bench/CsvReader.java | 175 ---------
.../src/java/org/apache/orc/bench/CsvScan.java | 40 --
.../java/org/apache/orc/bench/GithubToAvro.java | 2 +
.../java/org/apache/orc/bench/GithubToJson.java | 2 +-
.../java/org/apache/orc/bench/GithubToOrc.java | 4 +-
.../org/apache/orc/bench/GithubToParquet.java | 2 +
.../java/org/apache/orc/bench/JsonReader.java | 278 --------------
.../src/java/org/apache/orc/bench/JsonScan.java | 61 ---
.../src/java/org/apache/orc/bench/OrcScan.java | 46 ---
.../java/org/apache/orc/bench/ParquetScan.java | 54 ---
.../java/org/apache/orc/bench/SalesToAvro.java | 1 +
.../org/apache/orc/bench/SalesToParquet.java | 1 +
.../java/org/apache/orc/bench/TaxiToAvro.java | 2 +
.../java/org/apache/orc/bench/TaxiToJson.java | 1 +
.../java/org/apache/orc/bench/TaxiToOrc.java | 1 +
.../org/apache/orc/bench/TaxiToParquet.java | 2 +
.../org/apache/orc/bench/avro/AvroScan.java | 47 +++
.../apache/orc/bench/avro/AvroSchemaUtils.java | 190 ++++++++++
.../org/apache/orc/bench/avro/AvroWriter.java | 375 +++++++++++++++++++
.../org/apache/orc/bench/csv/CsvReader.java | 175 +++++++++
.../java/org/apache/orc/bench/csv/CsvScan.java | 41 ++
.../org/apache/orc/bench/json/JsonReader.java | 278 ++++++++++++++
.../org/apache/orc/bench/json/JsonScan.java | 61 +++
.../java/org/apache/orc/bench/orc/OrcScan.java | 46 +++
.../apache/orc/bench/parquet/ParquetScan.java | 54 +++
java/pom.xml | 15 +-
30 files changed, 1295 insertions(+), 1275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/pom.xml
----------------------------------------------------------------------
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index f0bf55a..f40f21b 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -67,6 +67,10 @@
<artifactId>hive-storage-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
deleted file mode 100644
index 61f6a62..0000000
--- a/java/bench/src/java/org/apache/orc/bench/AvroScan.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-public class AvroScan {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- long rowCount = 0;
- for(String filename: args) {
- FsInput file = new FsInput(new Path(filename), conf);
- DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
- DataFileReader<GenericRecord> dataFileReader =
- new DataFileReader<>(file, datumReader);
- GenericRecord record = null;
- while (dataFileReader.hasNext()) {
- record = dataFileReader.next(record);
- rowCount += 1;
- }
- }
- System.out.println("Rows read: " + rowCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
deleted file mode 100644
index 02931c3..0000000
--- a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.bench;
-
-import org.apache.avro.Schema;
-import org.apache.orc.TypeDescription;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Convert Hive TypeInfo to an Avro Schema
- */
-public class AvroSchemaUtils {
-
- private AvroSchemaUtils() {
- // No instances
- }
-
- public static Schema createAvroSchema(TypeDescription typeInfo) {
- Schema schema;
- switch (typeInfo.getCategory()) {
- case STRING:
- schema = Schema.create(Schema.Type.STRING);
- break;
- case CHAR:
- schema = getSchemaFor("{" +
- "\"type\":\"string\"," +
- "\"logicalType\":\"char\"," +
- "\"maxLength\":" + typeInfo.getMaxLength() + "}");
- break;
- case VARCHAR:
- schema = getSchemaFor("{" +
- "\"type\":\"string\"," +
- "\"logicalType\":\"varchar\"," +
- "\"maxLength\":" + typeInfo.getMaxLength() + "}");
- break;
- case BINARY:
- schema = Schema.create(Schema.Type.BYTES);
- break;
- case BYTE:
- schema = Schema.create(Schema.Type.INT);
- break;
- case SHORT:
- schema = Schema.create(Schema.Type.INT);
- break;
- case INT:
- schema = Schema.create(Schema.Type.INT);
- break;
- case LONG:
- schema = Schema.create(Schema.Type.LONG);
- break;
- case FLOAT:
- schema = Schema.create(Schema.Type.FLOAT);
- break;
- case DOUBLE:
- schema = Schema.create(Schema.Type.DOUBLE);
- break;
- case BOOLEAN:
- schema = Schema.create(Schema.Type.BOOLEAN);
- break;
- case DECIMAL:
- String precision = String.valueOf(typeInfo.getPrecision());
- String scale = String.valueOf(typeInfo.getScale());
- schema = getSchemaFor("{" +
- "\"type\":\"bytes\"," +
- "\"logicalType\":\"decimal\"," +
- "\"precision\":" + precision + "," +
- "\"scale\":" + scale + "}");
- break;
- case DATE:
- schema = getSchemaFor("{" +
- "\"type\":\"int\"," +
- "\"logicalType\":\"date\"}");
- break;
- case TIMESTAMP:
- schema = getSchemaFor("{" +
- "\"type\":\"long\"," +
- "\"logicalType\":\"timestamp-millis\"}");
- break;
- case LIST:
- schema = createAvroArray(typeInfo);
- break;
- case MAP:
- schema = createAvroMap(typeInfo);
- break;
- case STRUCT:
- schema = createAvroRecord(typeInfo);
- break;
- case UNION:
- schema = createAvroUnion(typeInfo);
- break;
- default:
- throw new UnsupportedOperationException(typeInfo + " is not supported.");
- }
-
- return wrapInUnionWithNull(schema);
- }
-
- private static Schema createAvroUnion(TypeDescription typeInfo) {
- List<Schema> childSchemas = new ArrayList<>();
- for (TypeDescription childTypeInfo : typeInfo.getChildren()) {
- Schema childSchema = createAvroSchema(childTypeInfo);
- if (childSchema.getType() == Schema.Type.UNION) {
- for (Schema grandkid: childSchema.getTypes()) {
- if (childSchema.getType() != Schema.Type.NULL) {
- childSchemas.add(grandkid);
- }
- }
- } else {
- childSchemas.add(childSchema);
- }
- }
-
- return Schema.createUnion(childSchemas);
- }
-
- private static Schema createAvroRecord(TypeDescription typeInfo) {
- List<Schema.Field> childFields = new ArrayList<>();
-
- List<String> fieldNames = typeInfo.getFieldNames();
- List<TypeDescription> fieldTypes = typeInfo.getChildren();
-
- for (int i = 0; i < fieldNames.size(); ++i) {
- TypeDescription childTypeInfo = fieldTypes.get(i);
- Schema.Field field = new Schema.Field(fieldNames.get(i),
- createAvroSchema(childTypeInfo), childTypeInfo.toString(),
- (Object) null);
- childFields.add(field);
- }
-
- Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(),
- typeInfo.toString(), null, false);
- recordSchema.setFields(childFields);
- return recordSchema;
- }
-
- private static Schema createAvroMap(TypeDescription typeInfo) {
- TypeDescription keyTypeInfo = typeInfo.getChildren().get(0);
- if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) {
- throw new UnsupportedOperationException("Avro only supports maps with string keys "
- + typeInfo);
- }
-
- Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1));
-
- return Schema.createMap(valueSchema);
- }
-
- private static Schema createAvroArray(TypeDescription typeInfo) {
- Schema child = createAvroSchema(typeInfo.getChildren().get(0));
- return Schema.createArray(child);
- }
-
- private static Schema wrapInUnionWithNull(Schema schema) {
- Schema NULL = Schema.create(Schema.Type.NULL);
- switch (schema.getType()) {
- case NULL:
- return schema;
- case UNION:
- List<Schema> kids = schema.getTypes();
- List<Schema> newKids = new ArrayList<>(kids.size() + 1);
- newKids.add(NULL);
- return Schema.createUnion(newKids);
- default:
- return Schema.createUnion(Arrays.asList(NULL, schema));
- }
- }
-
- private static Schema getSchemaFor(String str) {
- Schema.Parser parser = new Schema.Parser();
- return parser.parse(str);
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
deleted file mode 100644
index 094d115..0000000
--- a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.nio.Buffer;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-public class AvroWriter {
-
- static Properties setHiveSchema(TypeDescription schema) {
- if (schema.getCategory() != TypeDescription.Category.STRUCT) {
- throw new IllegalArgumentException("Assumes struct type as root, not " +
- schema);
- }
- StringBuilder fieldNames = new StringBuilder();
- StringBuilder fieldTypes = new StringBuilder();
- List<String> childNames = schema.getFieldNames();
- List<TypeDescription> childTypes = schema.getChildren();
- for(int f=0; f < childNames.size(); ++f) {
- if (f != 0) {
- fieldNames.append(',');
- fieldTypes.append(',');
- }
- fieldNames.append(childNames.get(f));
- fieldTypes.append(childTypes.get(f).toString());
- }
- Properties properties = new Properties();
- properties.put("columns", fieldNames.toString());
- properties.put("columns.types", fieldTypes.toString());
- return properties;
- }
-
- interface AvroConverter {
- Object convert(ColumnVector vector, int row);
- }
-
- private static class BooleanConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- LongColumnVector vector = (LongColumnVector) cv;
- return vector.vector[row] != 0;
- } else {
- return null;
- }
- }
- }
-
- private static class IntConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- LongColumnVector vector = (LongColumnVector) cv;
- return (int) vector.vector[row];
- } else {
- return null;
- }
- }
- }
-
- private static class LongConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- LongColumnVector vector = (LongColumnVector) cv;
- return vector.vector[row];
- } else {
- return null;
- }
- }
- }
-
- private static class FloatConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- DoubleColumnVector vector = (DoubleColumnVector) cv;
- return (float) vector.vector[row];
- } else {
- return null;
- }
- }
- }
-
- private static class DoubleConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- DoubleColumnVector vector = (DoubleColumnVector) cv;
- return vector.vector[row];
- } else {
- return null;
- }
- }
- }
-
- private static class StringConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- BytesColumnVector vector = (BytesColumnVector) cv;
- return new String(vector.vector[row], vector.start[row],
- vector.length[row]);
- } else {
- return null;
- }
- }
- }
-
- private static class BinaryConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- BytesColumnVector vector = (BytesColumnVector) cv;
- return ByteBuffer.wrap(vector.vector[row], vector.start[row],
- vector.length[row]);
- } else {
- return null;
- }
- }
- }
-
- private static class TimestampConverter implements AvroConverter {
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- TimestampColumnVector vector = (TimestampColumnVector) cv;
- return vector.time[row];
- } else {
- return null;
- }
- }
- }
-
- private static class DecimalConverter implements AvroConverter {
- final int scale;
- DecimalConverter(int scale) {
- this.scale = scale;
- }
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- DecimalColumnVector vector = (DecimalColumnVector) cv;
- return getBufferFromDecimal(
- vector.vector[row].getHiveDecimal(), scale);
- } else {
- return null;
- }
- }
- }
-
- private static class ListConverter implements AvroConverter {
- final Schema avroSchema;
- final AvroConverter childConverter;
-
- ListConverter(TypeDescription schema, Schema avroSchema) {
- this.avroSchema = avroSchema;
- childConverter = createConverter(schema.getChildren().get(0),
- removeNullable(avroSchema.getElementType()));
- }
-
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- ListColumnVector vector = (ListColumnVector) cv;
- int offset = (int) vector.offsets[row];
- int length = (int) vector.lengths[row];
- GenericData.Array result = new GenericData.Array(length, avroSchema);
- for(int i=0; i < length; ++i) {
- result.add(childConverter.convert(vector.child, offset + i));
- }
- return result;
- } else {
- return null;
- }
- }
- }
-
- private static class StructConverter implements AvroConverter {
- final Schema avroSchema;
- final AvroConverter[] childConverters;
-
- StructConverter(TypeDescription schema, Schema avroSchema) {
- this.avroSchema = avroSchema;
- List<TypeDescription> childrenTypes = schema.getChildren();
- childConverters = new AvroConverter[childrenTypes.size()];
- List<Schema.Field> fields = avroSchema.getFields();
- for(int f=0; f < childConverters.length; ++f) {
- childConverters[f] = createConverter(childrenTypes.get(f),
- removeNullable(fields.get(f).schema()));
- }
- }
-
- public Object convert(ColumnVector cv, int row) {
- if (cv.isRepeating) {
- row = 0;
- }
- if (cv.noNulls || !cv.isNull[row]) {
- StructColumnVector vector = (StructColumnVector) cv;
- GenericData.Record result = new GenericData.Record(avroSchema);
- for(int f=0; f < childConverters.length; ++f) {
- result.put(f, childConverters[f].convert(vector.fields[f], row));
- }
- return result;
- } else {
- return null;
- }
- }
- }
-
- static AvroConverter createConverter(TypeDescription types,
- Schema avroSchema) {
- switch (types.getCategory()) {
- case BINARY:
- return new BinaryConverter();
- case BOOLEAN:
- return new BooleanConverter();
- case BYTE:
- case SHORT:
- case INT:
- return new IntConverter();
- case LONG:
- return new LongConverter();
- case FLOAT:
- return new FloatConverter();
- case DOUBLE:
- return new DoubleConverter();
- case CHAR:
- case VARCHAR:
- case STRING:
- return new StringConverter();
- case TIMESTAMP:
- return new TimestampConverter();
- case DECIMAL:
- return new DecimalConverter(types.getScale());
- case LIST:
- return new ListConverter(types, avroSchema);
- case STRUCT:
- return new StructConverter(types, avroSchema);
- default:
- throw new IllegalArgumentException("Unhandled type " + types);
- }
- }
-
- /**
- * Remove the union(null, ...) wrapper around the schema.
- *
- * All of the types in Hive are nullable and in Avro those are represented
- * by wrapping each type in a union type with the void type.
- * @param avro The avro type
- * @return The avro type with the nullable layer removed
- */
- static Schema removeNullable(Schema avro) {
- while (avro.getType() == Schema.Type.UNION) {
- List<Schema> children = avro.getTypes();
- if (children.size() == 2 &&
- children.get(0).getType() == Schema.Type.NULL) {
- avro = children.get(1);
- } else {
- break;
- }
- }
- return avro;
- }
-
- private final AvroConverter[] converters;
- private final DataFileWriter writer;
- private final GenericRecord record;
-
- public AvroWriter(Path path, TypeDescription schema,
- Configuration conf,
- String compression) throws IOException {
- List<TypeDescription> childTypes = schema.getChildren();
- Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
- List<Schema.Field> avroFields = avroSchema.getFields();
- converters = new AvroConverter[childTypes.size()];
- for(int c=0; c < converters.length; ++c) {
- converters[c] = createConverter(childTypes.get(c),
- removeNullable(avroFields.get(c).schema()));
- }
- GenericDatumWriter gdw = new GenericDatumWriter(avroSchema);
- writer = new DataFileWriter(gdw);
- if (compression != null & !"".equals(compression)) {
- writer.setCodec(CodecFactory.fromString(compression));
- }
- writer.create(avroSchema, path.getFileSystem(conf).create(path));
- record = new GenericData.Record(avroSchema);
- }
-
- public void writeBatch(VectorizedRowBatch batch) throws IOException {
- for(int r=0; r < batch.size; ++r) {
- for(int f=0; f < batch.cols.length; ++f) {
- record.put(f, converters[f].convert(batch.cols[f], r));
- }
- writer.append(record);
- }
- }
-
- public void close() throws IOException {
- writer.close();
- }
-
- static Buffer getBufferFromBytes(byte[] input) {
- ByteBuffer bb = ByteBuffer.wrap(input);
- return bb.rewind();
- }
-
- public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
- if (dec == null) {
- return null;
- }
-
- dec = dec.setScale(scale);
- return getBufferFromBytes(dec.unscaledValue().toByteArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/CsvReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/CsvReader.java
deleted file mode 100644
index 5c86a89..0000000
--- a/java/bench/src/java/org/apache/orc/bench/CsvReader.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.bench;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.zip.GZIPInputStream;
-
-public class CsvReader {
- private final Iterator<CSVRecord> parser;
- private final ColumnReader[] readers;
-
- interface ColumnReader {
- void read(String value, ColumnVector vect, int row);
- }
-
- static class LongColumnReader implements ColumnReader {
- public void read(String value, ColumnVector vect, int row) {
- if ("".equals(value)) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- LongColumnVector vector = (LongColumnVector) vect;
- vector.vector[row] = Long.parseLong(value);
- }
- }
- }
-
- static class DoubleColumnReader implements ColumnReader {
- public void read(String value, ColumnVector vect, int row) {
- if ("".equals(value)) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- DoubleColumnVector vector = (DoubleColumnVector) vect;
- vector.vector[row] = Double.parseDouble(value);
- }
- }
- }
-
- static class StringColumnReader implements ColumnReader {
- public void read(String value, ColumnVector vect, int row) {
- if ("".equals(value)) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- BytesColumnVector vector = (BytesColumnVector) vect;
- byte[] bytes = value.getBytes();
- vector.setRef(row, bytes, 0, bytes.length);
- }
- }
- }
-
- static class TimestampColumnReader implements ColumnReader {
- public void read(String value, ColumnVector vect, int row) {
- if ("".equals(value)) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- TimestampColumnVector vector = (TimestampColumnVector) vect;
- vector.set(row, Timestamp.valueOf(value));
- }
- }
- }
-
- static class DecimalColumnReader implements ColumnReader {
- public void read(String value, ColumnVector vect, int row) {
- if ("".equals(value)) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- DecimalColumnVector vector = (DecimalColumnVector) vect;
- vector.vector[row].set(HiveDecimal.create(value));
- }
- }
- }
-
- ColumnReader createReader(TypeDescription schema) {
- switch (schema.getCategory()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- return new LongColumnReader();
- case FLOAT:
- case DOUBLE:
- return new DoubleColumnReader();
- case CHAR:
- case VARCHAR:
- case STRING:
- return new StringColumnReader();
- case DECIMAL:
- return new DecimalColumnReader();
- case TIMESTAMP:
- return new TimestampColumnReader();
- default:
- throw new IllegalArgumentException("Unhandled type " + schema);
- }
- }
-
- public CsvReader(Path path,
- Configuration conf,
- TypeDescription schema) throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FSDataInputStream raw = fs.open(path);
- String name = path.getName();
- int lastDot = name.lastIndexOf(".");
- InputStream input = raw;
- if (lastDot >= 0) {
- if (".gz".equals(name.substring(lastDot))) {
- input = new DataInputStream(new GZIPInputStream(raw));
- }
- }
- parser = new CSVParser(new InputStreamReader(input),
- CSVFormat.RFC4180.withHeader()).iterator();
- List<TypeDescription> columnTypes = schema.getChildren();
- readers = new ColumnReader[columnTypes.size()];
- int c = 0;
- for(TypeDescription columnType: columnTypes) {
- readers[c++] = createReader(columnType);
- }
- }
-
- public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
- batch.reset();
- int maxSize = batch.getMaxSize();
- while (parser.hasNext() && batch.size < maxSize) {
- CSVRecord record = parser.next();
- int c = 0;
- for(String val: record) {
- readers[c].read(val, batch.cols[c], batch.size);
- c += 1;
- }
- batch.size++;
- }
- return batch.size != 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/CsvScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/CsvScan.java
deleted file mode 100644
index f2ec61a..0000000
--- a/java/bench/src/java/org/apache/orc/bench/CsvScan.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-public class CsvScan {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- long rowCount = 0;
- TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
- for(String filename: args) {
- CsvReader reader = new CsvReader(new Path(filename), conf, schema);
- VectorizedRowBatch batch = schema.createRowBatch();
- while (reader.nextBatch(batch)) {
- rowCount += batch.size;
- }
- }
- System.out.println("Rows read: " + rowCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
index 982db64..ee882e7 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.json.JsonReader;
public class GithubToAvro {
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
index f5ae6b1..1dd23de 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java
@@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.json.JsonReader;
import org.apache.orc.tools.FileDump;
import java.io.OutputStreamWriter;
import java.io.Writer;
-import java.util.zip.GZIPOutputStream;
public class GithubToJson {
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
index cbc1997..ebd6443 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
@@ -24,9 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
-
-import java.io.IOException;
-import java.io.InputStream;
+import org.apache.orc.bench.json.JsonReader;
public class GithubToOrc {
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
index e1fafdc..db88c52 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.json.JsonReader;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/JsonReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/JsonReader.java
deleted file mode 100644
index 599c872..0000000
--- a/java/bench/src/java/org/apache/orc/bench/JsonReader.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.bench;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonStreamParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.zip.GZIPInputStream;
-
-public class JsonReader {
- private final TypeDescription schema;
- private final JsonStreamParser parser;
- private final JsonConverter[] converters;
-
- interface JsonConverter {
- void convert(JsonElement value, ColumnVector vect, int row);
- }
-
- static class BooleanColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- LongColumnVector vector = (LongColumnVector) vect;
- vector.vector[row] = value.getAsBoolean() ? 1 : 0;
- }
- }
- }
-
- static class LongColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- LongColumnVector vector = (LongColumnVector) vect;
- vector.vector[row] = value.getAsLong();
- }
- }
- }
-
- static class DoubleColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- DoubleColumnVector vector = (DoubleColumnVector) vect;
- vector.vector[row] = value.getAsDouble();
- }
- }
- }
-
- static class StringColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- BytesColumnVector vector = (BytesColumnVector) vect;
- byte[] bytes = value.getAsString().getBytes();
- vector.setRef(row, bytes, 0, bytes.length);
- }
- }
- }
-
- static class BinaryColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- BytesColumnVector vector = (BytesColumnVector) vect;
- String binStr = value.getAsString();
- byte[] bytes = new byte[binStr.length()/2];
- for(int i=0; i < bytes.length; ++i) {
- bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16);
- }
- vector.setRef(row, bytes, 0, bytes.length);
- }
- }
- }
-
- static class TimestampColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- TimestampColumnVector vector = (TimestampColumnVector) vect;
- vector.set(row, Timestamp.valueOf(value.getAsString()
- .replaceAll("[TZ]", " ")));
- }
- }
- }
-
- static class DecimalColumnConverter implements JsonConverter {
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- DecimalColumnVector vector = (DecimalColumnVector) vect;
- vector.vector[row].set(HiveDecimal.create(value.getAsString()));
- }
- }
- }
-
- static class StructColumnConverter implements JsonConverter {
- private JsonConverter[] childrenConverters;
- private List<String> fieldNames;
-
- public StructColumnConverter(TypeDescription schema) {
- List<TypeDescription> kids = schema.getChildren();
- childrenConverters = new JsonConverter[kids.size()];
- for(int c=0; c < childrenConverters.length; ++c) {
- childrenConverters[c] = createConverter(kids.get(c));
- }
- fieldNames = schema.getFieldNames();
- }
-
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- StructColumnVector vector = (StructColumnVector) vect;
- JsonObject obj = value.getAsJsonObject();
- for(int c=0; c < childrenConverters.length; ++c) {
- JsonElement elem = obj.get(fieldNames.get(c));
- childrenConverters[c].convert(elem, vector.fields[c], row);
- }
- }
- }
- }
-
- static class ListColumnConverter implements JsonConverter {
- private JsonConverter childrenConverter;
-
- public ListColumnConverter(TypeDescription schema) {
- childrenConverter = createConverter(schema.getChildren().get(0));
- }
-
- public void convert(JsonElement value, ColumnVector vect, int row) {
- if (value == null || value.isJsonNull()) {
- vect.noNulls = false;
- vect.isNull[row] = true;
- } else {
- ListColumnVector vector = (ListColumnVector) vect;
- JsonArray obj = value.getAsJsonArray();
- vector.lengths[row] = obj.size();
- vector.offsets[row] = vector.childCount;
- vector.childCount += vector.lengths[row];
- vector.child.ensureSize(vector.childCount, true);
- for(int c=0; c < obj.size(); ++c) {
- childrenConverter.convert(obj.get(c), vector.child,
- (int) vector.offsets[row] + c);
- }
- }
- }
- }
-
- static JsonConverter createConverter(TypeDescription schema) {
- switch (schema.getCategory()) {
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- return new LongColumnConverter();
- case FLOAT:
- case DOUBLE:
- return new DoubleColumnConverter();
- case CHAR:
- case VARCHAR:
- case STRING:
- return new StringColumnConverter();
- case DECIMAL:
- return new DecimalColumnConverter();
- case TIMESTAMP:
- return new TimestampColumnConverter();
- case BINARY:
- return new BinaryColumnConverter();
- case BOOLEAN:
- return new BooleanColumnConverter();
- case STRUCT:
- return new StructColumnConverter(schema);
- case LIST:
- return new ListColumnConverter(schema);
- default:
- throw new IllegalArgumentException("Unhandled type " + schema);
- }
- }
-
- public JsonReader(Path path,
- Configuration conf,
- TypeDescription schema) throws IOException {
- this.schema = schema;
- FileSystem fs = path.getFileSystem(conf);
- FSDataInputStream raw = fs.open(path);
- String name = path.getName();
- int lastDot = name.lastIndexOf(".");
- InputStream input = raw;
- if (lastDot >= 0) {
- if (".gz".equals(name.substring(lastDot))) {
- input = new GZIPInputStream(raw);
- }
- }
- parser = new JsonStreamParser(new InputStreamReader(input));
- if (schema.getCategory() != TypeDescription.Category.STRUCT) {
- throw new IllegalArgumentException("Root must be struct - " + schema);
- }
- List<TypeDescription> fieldTypes = schema.getChildren();
- converters = new JsonConverter[fieldTypes.size()];
- for(int c = 0; c < converters.length; ++c) {
- converters[c] = createConverter(fieldTypes.get(c));
- }
- }
-
- public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
- batch.reset();
- int maxSize = batch.getMaxSize();
- List<String> fieldNames = schema.getFieldNames();
- while (parser.hasNext() && batch.size < maxSize) {
- JsonObject elem = parser.next().getAsJsonObject();
- for(int c=0; c < converters.length; ++c) {
- // look up each field to see if it is in the input, otherwise
- // set it to null.
- JsonElement field = elem.get(fieldNames.get(c));
- if (field == null) {
- batch.cols[c].noNulls = false;
- batch.cols[c].isNull[batch.size] = true;
- } else {
- converters[c].convert(field, batch.cols[c], batch.size);
- }
- }
- batch.size++;
- }
- return batch.size != 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/JsonScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/JsonScan.java b/java/bench/src/java/org/apache/orc/bench/JsonScan.java
deleted file mode 100644
index 1115ae6..0000000
--- a/java/bench/src/java/org/apache/orc/bench/JsonScan.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import com.google.gson.JsonStreamParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.zip.GZIPInputStream;
-
-public class JsonScan {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
- long rowCount = 0;
- for(String filename: args) {
- Path path = new Path(filename);
- FileSystem fs = path.getFileSystem(conf);
- FSDataInputStream raw = fs.open(path);
- int lastDot = filename.lastIndexOf(".");
- InputStream input = raw;
- if (lastDot >= 0) {
- if (".gz".equals(filename.substring(lastDot))) {
- input = new GZIPInputStream(raw);
- }
- }
- JsonStreamParser parser =
- new JsonStreamParser(new InputStreamReader(input));
- while (parser.hasNext()) {
- parser.next();
- rowCount += 1;
- }
- }
- System.out.println("Rows read: " + rowCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/OrcScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/OrcScan.java b/java/bench/src/java/org/apache/orc/bench/OrcScan.java
deleted file mode 100644
index 096f3fa..0000000
--- a/java/bench/src/java/org/apache/orc/bench/OrcScan.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-public class OrcScan {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
- long rowCount = 0;
- for(String filename: args) {
- Reader reader = OrcFile.createReader(new Path(filename), options);
- TypeDescription schema = reader.getSchema();
- RecordReader rows = reader.rows();
- VectorizedRowBatch batch = schema.createRowBatch();
- while (rows.nextBatch(batch)) {
- rowCount += batch.size;
- }
- rows.close();
- }
- System.out.println("Rows read: " + rowCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java
deleted file mode 100644
index ccaaa2a..0000000
--- a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.bench;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.parquet.hadoop.ParquetInputFormat;
-
-public class ParquetScan {
- public static void main(String[] args) throws Exception {
- JobConf conf = new JobConf();
- long rowCount = 0;
- ParquetInputFormat<ArrayWritable> inputFormat =
- new ParquetInputFormat<>(DataWritableReadSupport.class);
-
- NullWritable nada = NullWritable.get();
- for(String filename: args) {
- FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE,
- new String[]{});
- RecordReader<NullWritable,ArrayWritable> recordReader =
- new ParquetRecordReaderWrapper(inputFormat, split, conf,
- Reporter.NULL);
- ArrayWritable value = recordReader.createValue();
- while (recordReader.next(nada, value)) {
- rowCount += 1;
- }
- recordReader.close();
- }
- System.out.println("Rows read: " + rowCount);
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
index d4fd4a2..900be66 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
public class SalesToAvro {
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
index 985da90..3da900f 100644
--- a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
import java.util.Properties;
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
index b490a8a..2b14f50 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.csv.CsvReader;
public class TaxiToAvro {
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
index 98fbe17..4b8ca8c 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.csv.CsvReader;
import org.apache.orc.tools.FileDump;
import org.iq80.snappy.SnappyOutputStream;
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
index dee5da6..2588c72 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
@@ -25,6 +25,7 @@ import org.apache.orc.OrcFile;
import org.apache.orc.CompressionKind;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.bench.csv.CsvReader;
import java.io.IOException;
import java.io.InputStream;
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
index 3edce17..3eafc87 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.avro.AvroWriter;
+import org.apache.orc.bench.csv.CsvReader;
import java.util.Properties;
public class TaxiToParquet {
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java
new file mode 100644
index 0000000..1292c2b
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class AvroScan {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ long rowCount = 0;
+ for(String filename: args) {
+ FsInput file = new FsInput(new Path(filename), conf);
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
+ DataFileReader<GenericRecord> dataFileReader =
+ new DataFileReader<>(file, datumReader);
+ GenericRecord record = null;
+ while (dataFileReader.hasNext()) {
+ record = dataFileReader.next(record);
+ rowCount += 1;
+ }
+ }
+ System.out.println("Rows read: " + rowCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java
new file mode 100644
index 0000000..5df7b70
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.avro;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Convert Hive TypeInfo to an Avro Schema
+ */
+public class AvroSchemaUtils {
+
+ private AvroSchemaUtils() {
+ // No instances
+ }
+
+ public static Schema createAvroSchema(TypeDescription typeInfo) {
+ Schema schema;
+ switch (typeInfo.getCategory()) {
+ case STRING:
+ schema = Schema.create(Schema.Type.STRING);
+ break;
+ case CHAR:
+ schema = getSchemaFor("{" +
+ "\"type\":\"string\"," +
+ "\"logicalType\":\"char\"," +
+ "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+ break;
+ case VARCHAR:
+ schema = getSchemaFor("{" +
+ "\"type\":\"string\"," +
+ "\"logicalType\":\"varchar\"," +
+ "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+ break;
+ case BINARY:
+ schema = Schema.create(Schema.Type.BYTES);
+ break;
+ case BYTE:
+ schema = Schema.create(Schema.Type.INT);
+ break;
+ case SHORT:
+ schema = Schema.create(Schema.Type.INT);
+ break;
+ case INT:
+ schema = Schema.create(Schema.Type.INT);
+ break;
+ case LONG:
+ schema = Schema.create(Schema.Type.LONG);
+ break;
+ case FLOAT:
+ schema = Schema.create(Schema.Type.FLOAT);
+ break;
+ case DOUBLE:
+ schema = Schema.create(Schema.Type.DOUBLE);
+ break;
+ case BOOLEAN:
+ schema = Schema.create(Schema.Type.BOOLEAN);
+ break;
+ case DECIMAL:
+ String precision = String.valueOf(typeInfo.getPrecision());
+ String scale = String.valueOf(typeInfo.getScale());
+ schema = getSchemaFor("{" +
+ "\"type\":\"bytes\"," +
+ "\"logicalType\":\"decimal\"," +
+ "\"precision\":" + precision + "," +
+ "\"scale\":" + scale + "}");
+ break;
+ case DATE:
+ schema = getSchemaFor("{" +
+ "\"type\":\"int\"," +
+ "\"logicalType\":\"date\"}");
+ break;
+ case TIMESTAMP:
+ schema = getSchemaFor("{" +
+ "\"type\":\"long\"," +
+ "\"logicalType\":\"timestamp-millis\"}");
+ break;
+ case LIST:
+ schema = createAvroArray(typeInfo);
+ break;
+ case MAP:
+ schema = createAvroMap(typeInfo);
+ break;
+ case STRUCT:
+ schema = createAvroRecord(typeInfo);
+ break;
+ case UNION:
+ schema = createAvroUnion(typeInfo);
+ break;
+ default:
+ throw new UnsupportedOperationException(typeInfo + " is not supported.");
+ }
+
+ return wrapInUnionWithNull(schema);
+ }
+
+ private static Schema createAvroUnion(TypeDescription typeInfo) {
+ List<Schema> childSchemas = new ArrayList<>();
+ for (TypeDescription childTypeInfo : typeInfo.getChildren()) {
+ Schema childSchema = createAvroSchema(childTypeInfo);
+ if (childSchema.getType() == Schema.Type.UNION) {
+ for (Schema grandkid: childSchema.getTypes()) {
+ if (childSchema.getType() != Schema.Type.NULL) {
+ childSchemas.add(grandkid);
+ }
+ }
+ } else {
+ childSchemas.add(childSchema);
+ }
+ }
+
+ return Schema.createUnion(childSchemas);
+ }
+
+ private static Schema createAvroRecord(TypeDescription typeInfo) {
+ List<Schema.Field> childFields = new ArrayList<>();
+
+ List<String> fieldNames = typeInfo.getFieldNames();
+ List<TypeDescription> fieldTypes = typeInfo.getChildren();
+
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ TypeDescription childTypeInfo = fieldTypes.get(i);
+ Schema.Field field = new Schema.Field(fieldNames.get(i),
+ createAvroSchema(childTypeInfo), childTypeInfo.toString(),
+ (Object) null);
+ childFields.add(field);
+ }
+
+ Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(),
+ typeInfo.toString(), null, false);
+ recordSchema.setFields(childFields);
+ return recordSchema;
+ }
+
+ private static Schema createAvroMap(TypeDescription typeInfo) {
+ TypeDescription keyTypeInfo = typeInfo.getChildren().get(0);
+ if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) {
+ throw new UnsupportedOperationException("Avro only supports maps with string keys "
+ + typeInfo);
+ }
+
+ Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1));
+
+ return Schema.createMap(valueSchema);
+ }
+
+ private static Schema createAvroArray(TypeDescription typeInfo) {
+ Schema child = createAvroSchema(typeInfo.getChildren().get(0));
+ return Schema.createArray(child);
+ }
+
+ private static Schema wrapInUnionWithNull(Schema schema) {
+ Schema NULL = Schema.create(Schema.Type.NULL);
+ switch (schema.getType()) {
+ case NULL:
+ return schema;
+ case UNION:
+ List<Schema> kids = schema.getTypes();
+ List<Schema> newKids = new ArrayList<>(kids.size() + 1);
+ newKids.add(NULL);
+ return Schema.createUnion(newKids);
+ default:
+ return Schema.createUnion(Arrays.asList(NULL, schema));
+ }
+ }
+
+ private static Schema getSchemaFor(String str) {
+ Schema.Parser parser = new Schema.Parser();
+ return parser.parse(str);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
new file mode 100644
index 0000000..f9d3bad
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+public class AvroWriter {
+
+ static Properties setHiveSchema(TypeDescription schema) {
+ if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+ throw new IllegalArgumentException("Assumes struct type as root, not " +
+ schema);
+ }
+ StringBuilder fieldNames = new StringBuilder();
+ StringBuilder fieldTypes = new StringBuilder();
+ List<String> childNames = schema.getFieldNames();
+ List<TypeDescription> childTypes = schema.getChildren();
+ for(int f=0; f < childNames.size(); ++f) {
+ if (f != 0) {
+ fieldNames.append(',');
+ fieldTypes.append(',');
+ }
+ fieldNames.append(childNames.get(f));
+ fieldTypes.append(childTypes.get(f).toString());
+ }
+ Properties properties = new Properties();
+ properties.put("columns", fieldNames.toString());
+ properties.put("columns.types", fieldTypes.toString());
+ return properties;
+ }
+
+ interface AvroConverter {
+ Object convert(ColumnVector vector, int row);
+ }
+
+ private static class BooleanConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ LongColumnVector vector = (LongColumnVector) cv;
+ return vector.vector[row] != 0;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class IntConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ LongColumnVector vector = (LongColumnVector) cv;
+ return (int) vector.vector[row];
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class LongConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ LongColumnVector vector = (LongColumnVector) cv;
+ return vector.vector[row];
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class FloatConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ DoubleColumnVector vector = (DoubleColumnVector) cv;
+ return (float) vector.vector[row];
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class DoubleConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ DoubleColumnVector vector = (DoubleColumnVector) cv;
+ return vector.vector[row];
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class StringConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ BytesColumnVector vector = (BytesColumnVector) cv;
+ return new String(vector.vector[row], vector.start[row],
+ vector.length[row]);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class BinaryConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ BytesColumnVector vector = (BytesColumnVector) cv;
+ return ByteBuffer.wrap(vector.vector[row], vector.start[row],
+ vector.length[row]);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class TimestampConverter implements AvroConverter {
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ TimestampColumnVector vector = (TimestampColumnVector) cv;
+ return vector.time[row];
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class DecimalConverter implements AvroConverter {
+ final int scale;
+ DecimalConverter(int scale) {
+ this.scale = scale;
+ }
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ DecimalColumnVector vector = (DecimalColumnVector) cv;
+ return getBufferFromDecimal(
+ vector.vector[row].getHiveDecimal(), scale);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class ListConverter implements AvroConverter {
+ final Schema avroSchema;
+ final AvroConverter childConverter;
+
+ ListConverter(TypeDescription schema, Schema avroSchema) {
+ this.avroSchema = avroSchema;
+ childConverter = createConverter(schema.getChildren().get(0),
+ removeNullable(avroSchema.getElementType()));
+ }
+
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ ListColumnVector vector = (ListColumnVector) cv;
+ int offset = (int) vector.offsets[row];
+ int length = (int) vector.lengths[row];
+ GenericData.Array result = new GenericData.Array(length, avroSchema);
+ for(int i=0; i < length; ++i) {
+ result.add(childConverter.convert(vector.child, offset + i));
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class StructConverter implements AvroConverter {
+ final Schema avroSchema;
+ final AvroConverter[] childConverters;
+
+ StructConverter(TypeDescription schema, Schema avroSchema) {
+ this.avroSchema = avroSchema;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ childConverters = new AvroConverter[childrenTypes.size()];
+ List<Schema.Field> fields = avroSchema.getFields();
+ for(int f=0; f < childConverters.length; ++f) {
+ childConverters[f] = createConverter(childrenTypes.get(f),
+ removeNullable(fields.get(f).schema()));
+ }
+ }
+
+ public Object convert(ColumnVector cv, int row) {
+ if (cv.isRepeating) {
+ row = 0;
+ }
+ if (cv.noNulls || !cv.isNull[row]) {
+ StructColumnVector vector = (StructColumnVector) cv;
+ GenericData.Record result = new GenericData.Record(avroSchema);
+ for(int f=0; f < childConverters.length; ++f) {
+ result.put(f, childConverters[f].convert(vector.fields[f], row));
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ static AvroConverter createConverter(TypeDescription types,
+ Schema avroSchema) {
+ switch (types.getCategory()) {
+ case BINARY:
+ return new BinaryConverter();
+ case BOOLEAN:
+ return new BooleanConverter();
+ case BYTE:
+ case SHORT:
+ case INT:
+ return new IntConverter();
+ case LONG:
+ return new LongConverter();
+ case FLOAT:
+ return new FloatConverter();
+ case DOUBLE:
+ return new DoubleConverter();
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return new StringConverter();
+ case TIMESTAMP:
+ return new TimestampConverter();
+ case DECIMAL:
+ return new DecimalConverter(types.getScale());
+ case LIST:
+ return new ListConverter(types, avroSchema);
+ case STRUCT:
+ return new StructConverter(types, avroSchema);
+ default:
+ throw new IllegalArgumentException("Unhandled type " + types);
+ }
+ }
+
+ /**
+ * Remove the union(null, ...) wrapper around the schema.
+ *
+ * All of the types in Hive are nullable and in Avro those are represented
+ * by wrapping each type in a union type with the void type.
+ * @param avro The avro type
+ * @return The avro type with the nullable layer removed
+ */
+ static Schema removeNullable(Schema avro) {
+ while (avro.getType() == Schema.Type.UNION) {
+ List<Schema> children = avro.getTypes();
+ if (children.size() == 2 &&
+ children.get(0).getType() == Schema.Type.NULL) {
+ avro = children.get(1);
+ } else {
+ break;
+ }
+ }
+ return avro;
+ }
+
+ private final AvroConverter[] converters;
+ private final DataFileWriter writer;
+ private final GenericRecord record;
+
+ public AvroWriter(Path path, TypeDescription schema,
+ Configuration conf,
+ String compression) throws IOException {
+ List<TypeDescription> childTypes = schema.getChildren();
+ Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
+ List<Schema.Field> avroFields = avroSchema.getFields();
+ converters = new AvroConverter[childTypes.size()];
+ for(int c=0; c < converters.length; ++c) {
+ converters[c] = createConverter(childTypes.get(c),
+ removeNullable(avroFields.get(c).schema()));
+ }
+ GenericDatumWriter gdw = new GenericDatumWriter(avroSchema);
+ writer = new DataFileWriter(gdw);
+ if (compression != null & !"".equals(compression)) {
+ writer.setCodec(CodecFactory.fromString(compression));
+ }
+ writer.create(avroSchema, path.getFileSystem(conf).create(path));
+ record = new GenericData.Record(avroSchema);
+ }
+
+ public void writeBatch(VectorizedRowBatch batch) throws IOException {
+ for(int r=0; r < batch.size; ++r) {
+ for(int f=0; f < batch.cols.length; ++f) {
+ record.put(f, converters[f].convert(batch.cols[f], r));
+ }
+ writer.append(record);
+ }
+ }
+
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ static Buffer getBufferFromBytes(byte[] input) {
+ ByteBuffer bb = ByteBuffer.wrap(input);
+ return bb.rewind();
+ }
+
+ public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
+ if (dec == null) {
+ return null;
+ }
+
+ dec = dec.setScale(scale);
+ return getBufferFromBytes(dec.unscaledValue().toByteArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java
new file mode 100644
index 0000000..e99ee8f
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Timestamp;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+public class CsvReader {
+ private final Iterator<CSVRecord> parser;
+ private final ColumnReader[] readers;
+
+ interface ColumnReader {
+ void read(String value, ColumnVector vect, int row);
+ }
+
+ static class LongColumnReader implements ColumnReader {
+ public void read(String value, ColumnVector vect, int row) {
+ if ("".equals(value)) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ LongColumnVector vector = (LongColumnVector) vect;
+ vector.vector[row] = Long.parseLong(value);
+ }
+ }
+ }
+
+ static class DoubleColumnReader implements ColumnReader {
+ public void read(String value, ColumnVector vect, int row) {
+ if ("".equals(value)) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ DoubleColumnVector vector = (DoubleColumnVector) vect;
+ vector.vector[row] = Double.parseDouble(value);
+ }
+ }
+ }
+
+ static class StringColumnReader implements ColumnReader {
+ public void read(String value, ColumnVector vect, int row) {
+ if ("".equals(value)) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ BytesColumnVector vector = (BytesColumnVector) vect;
+ byte[] bytes = value.getBytes();
+ vector.setRef(row, bytes, 0, bytes.length);
+ }
+ }
+ }
+
+ static class TimestampColumnReader implements ColumnReader {
+ public void read(String value, ColumnVector vect, int row) {
+ if ("".equals(value)) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ TimestampColumnVector vector = (TimestampColumnVector) vect;
+ vector.set(row, Timestamp.valueOf(value));
+ }
+ }
+ }
+
+ static class DecimalColumnReader implements ColumnReader {
+ public void read(String value, ColumnVector vect, int row) {
+ if ("".equals(value)) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ DecimalColumnVector vector = (DecimalColumnVector) vect;
+ vector.vector[row].set(HiveDecimal.create(value));
+ }
+ }
+ }
+
+ ColumnReader createReader(TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return new LongColumnReader();
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleColumnReader();
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return new StringColumnReader();
+ case DECIMAL:
+ return new DecimalColumnReader();
+ case TIMESTAMP:
+ return new TimestampColumnReader();
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+
+ public CsvReader(Path path,
+ Configuration conf,
+ TypeDescription schema) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ FSDataInputStream raw = fs.open(path);
+ String name = path.getName();
+ int lastDot = name.lastIndexOf(".");
+ InputStream input = raw;
+ if (lastDot >= 0) {
+ if (".gz".equals(name.substring(lastDot))) {
+ input = new DataInputStream(new GZIPInputStream(raw));
+ }
+ }
+ parser = new CSVParser(new InputStreamReader(input),
+ CSVFormat.RFC4180.withHeader()).iterator();
+ List<TypeDescription> columnTypes = schema.getChildren();
+ readers = new ColumnReader[columnTypes.size()];
+ int c = 0;
+ for(TypeDescription columnType: columnTypes) {
+ readers[c++] = createReader(columnType);
+ }
+ }
+
+ public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+ batch.reset();
+ int maxSize = batch.getMaxSize();
+ while (parser.hasNext() && batch.size < maxSize) {
+ CSVRecord record = parser.next();
+ int c = 0;
+ for(String val: record) {
+ readers[c].read(val, batch.cols[c], batch.size);
+ c += 1;
+ }
+ batch.size++;
+ }
+ return batch.size != 0;
+ }
+}