You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:08 UTC
[06/15] orc git commit: more updates
more updates
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/86628bcb
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/86628bcb
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/86628bcb
Branch: refs/heads/orc-72
Commit: 86628bcbffc1d19f8f2f1fe5c840ac9d429d3dc6
Parents: 5ae2d41
Author: Owen O'Malley <om...@apache.org>
Authored: Sat Oct 1 10:24:32 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Oct 10 13:59:16 2016 -0700
----------------------------------------------------------------------
java/bench/pom.xml | 5 +
.../hadoop/hive/ql/io/orc/VectorToWritable.java | 70 -------
.../src/java/org/apache/orc/bench/AvroScan.java | 1 -
.../org/apache/orc/bench/AvroSchemaUtils.java | 190 +++++++++++++++++++
.../java/org/apache/orc/bench/AvroWriter.java | 31 +--
.../orc/bench/ColumnProjectionBenchmark.java | 1 -
.../org/apache/orc/bench/FullReadBenchmark.java | 4 +-
.../java/org/apache/orc/bench/GithubToOrc.java | 2 +-
.../java/org/apache/orc/bench/TaxiToOrc.java | 2 +-
java/pom.xml | 17 +-
10 files changed, 224 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/pom.xml
----------------------------------------------------------------------
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index 738dfb3..f0bf55a 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -46,6 +46,11 @@
<artifactId>avro</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java
deleted file mode 100644
index ae8e8da..0000000
--- a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.TypeDescription;
-
-import java.util.List;
-
-/**
- * This class is just here to provide a public API to some of the ORC internal
- * methods.
- */
-public class VectorToWritable {
- public static ObjectInspector createObjectInspector(TypeDescription schema) {
- // convert the type descr to protobuf types
- List<OrcProto.Type> types = OrcUtils.getOrcTypes(schema);
- // convert the protobuf types to an ObjectInspector
- return OrcStruct.createObjectInspector(0, types);
- }
-
- public static Object createValue(VectorizedRowBatch batch,
- int row,
- TypeDescription schema,
- Object previous) {
- if(schema.getCategory() == TypeDescription.Category.STRUCT) {
- List<TypeDescription> children = schema.getChildren();
- int numberOfChildren = children.size();
- OrcStruct result;
- if(previous != null && previous.getClass() == OrcStruct.class) {
- result = (OrcStruct)previous;
- if(result.getNumFields() != numberOfChildren) {
- result.setNumFields(numberOfChildren);
- }
- } else {
- result = new OrcStruct(numberOfChildren);
- previous = result;
- }
-
- for(int i = 0; i < numberOfChildren; ++i) {
- result.setFieldValue(i, RecordReaderImpl.nextValue(batch.cols[i], row,
- children.get(i), result.getFieldValue(i)));
- }
- } else {
- previous = RecordReaderImpl.nextValue(batch.cols[0], row, schema,
- previous);
- }
- ;
- return previous;
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/AvroScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
index fcb8fce..61f6a62 100644
--- a/java/bench/src/java/org/apache/orc/bench/AvroScan.java
+++ b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
@@ -26,7 +26,6 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.orc.TypeDescription;
public class AvroScan {
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
new file mode 100644
index 0000000..02931c3
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Convert Hive TypeInfo to an Avro Schema
+ */
+public class AvroSchemaUtils {
+
+ private AvroSchemaUtils() {
+ // No instances
+ }
+
+ public static Schema createAvroSchema(TypeDescription typeInfo) {
+ Schema schema;
+ switch (typeInfo.getCategory()) {
+ case STRING:
+ schema = Schema.create(Schema.Type.STRING);
+ break;
+ case CHAR:
+ schema = getSchemaFor("{" +
+ "\"type\":\"string\"," +
+ "\"logicalType\":\"char\"," +
+ "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+ break;
+ case VARCHAR:
+ schema = getSchemaFor("{" +
+ "\"type\":\"string\"," +
+ "\"logicalType\":\"varchar\"," +
+ "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+ break;
+ case BINARY:
+ schema = Schema.create(Schema.Type.BYTES);
+ break;
+ case BYTE:
+ schema = Schema.create(Schema.Type.INT);
+ break;
+ case SHORT:
+ schema = Schema.create(Schema.Type.INT);
+ break;
+ case INT:
+ schema = Schema.create(Schema.Type.INT);
+ break;
+ case LONG:
+ schema = Schema.create(Schema.Type.LONG);
+ break;
+ case FLOAT:
+ schema = Schema.create(Schema.Type.FLOAT);
+ break;
+ case DOUBLE:
+ schema = Schema.create(Schema.Type.DOUBLE);
+ break;
+ case BOOLEAN:
+ schema = Schema.create(Schema.Type.BOOLEAN);
+ break;
+ case DECIMAL:
+ String precision = String.valueOf(typeInfo.getPrecision());
+ String scale = String.valueOf(typeInfo.getScale());
+ schema = getSchemaFor("{" +
+ "\"type\":\"bytes\"," +
+ "\"logicalType\":\"decimal\"," +
+ "\"precision\":" + precision + "," +
+ "\"scale\":" + scale + "}");
+ break;
+ case DATE:
+ schema = getSchemaFor("{" +
+ "\"type\":\"int\"," +
+ "\"logicalType\":\"date\"}");
+ break;
+ case TIMESTAMP:
+ schema = getSchemaFor("{" +
+ "\"type\":\"long\"," +
+ "\"logicalType\":\"timestamp-millis\"}");
+ break;
+ case LIST:
+ schema = createAvroArray(typeInfo);
+ break;
+ case MAP:
+ schema = createAvroMap(typeInfo);
+ break;
+ case STRUCT:
+ schema = createAvroRecord(typeInfo);
+ break;
+ case UNION:
+ schema = createAvroUnion(typeInfo);
+ break;
+ default:
+ throw new UnsupportedOperationException(typeInfo + " is not supported.");
+ }
+
+ return wrapInUnionWithNull(schema);
+ }
+
+ private static Schema createAvroUnion(TypeDescription typeInfo) {
+ List<Schema> childSchemas = new ArrayList<>();
+ for (TypeDescription childTypeInfo : typeInfo.getChildren()) {
+ Schema childSchema = createAvroSchema(childTypeInfo);
+ if (childSchema.getType() == Schema.Type.UNION) {
+ for (Schema grandkid: childSchema.getTypes()) {
+ if (childSchema.getType() != Schema.Type.NULL) {
+ childSchemas.add(grandkid);
+ }
+ }
+ } else {
+ childSchemas.add(childSchema);
+ }
+ }
+
+ return Schema.createUnion(childSchemas);
+ }
+
+ private static Schema createAvroRecord(TypeDescription typeInfo) {
+ List<Schema.Field> childFields = new ArrayList<>();
+
+ List<String> fieldNames = typeInfo.getFieldNames();
+ List<TypeDescription> fieldTypes = typeInfo.getChildren();
+
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ TypeDescription childTypeInfo = fieldTypes.get(i);
+ Schema.Field field = new Schema.Field(fieldNames.get(i),
+ createAvroSchema(childTypeInfo), childTypeInfo.toString(),
+ (Object) null);
+ childFields.add(field);
+ }
+
+ Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(),
+ typeInfo.toString(), null, false);
+ recordSchema.setFields(childFields);
+ return recordSchema;
+ }
+
+ private static Schema createAvroMap(TypeDescription typeInfo) {
+ TypeDescription keyTypeInfo = typeInfo.getChildren().get(0);
+ if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) {
+ throw new UnsupportedOperationException("Avro only supports maps with string keys "
+ + typeInfo);
+ }
+
+ Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1));
+
+ return Schema.createMap(valueSchema);
+ }
+
+ private static Schema createAvroArray(TypeDescription typeInfo) {
+ Schema child = createAvroSchema(typeInfo.getChildren().get(0));
+ return Schema.createArray(child);
+ }
+
+ private static Schema wrapInUnionWithNull(Schema schema) {
+ Schema NULL = Schema.create(Schema.Type.NULL);
+ switch (schema.getType()) {
+ case NULL:
+ return schema;
+ case UNION:
+ List<Schema> kids = schema.getTypes();
+ List<Schema> newKids = new ArrayList<>(kids.size() + 1);
+ newKids.add(NULL);
+ return Schema.createUnion(newKids);
+ default:
+ return Schema.createUnion(Arrays.asList(NULL, schema));
+ }
+ }
+
+ private static Schema getSchemaFor(String str) {
+ Schema.Parser parser = new Schema.Parser();
+ return parser.parse(str);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
index ca0984b..094d115 100644
--- a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
+++ b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -35,11 +36,10 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.orc.TypeDescription;
import java.io.IOException;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
@@ -69,13 +69,6 @@ public class AvroWriter {
return properties;
}
- static Schema createAvroSchema(TypeDescription schema,
- Configuration conf
- ) throws IOException, AvroSerdeException {
- Properties properties = setHiveSchema(schema);
- return AvroSerdeUtils.determineSchemaOrThrowException(conf, properties);
- }
-
interface AvroConverter {
Object convert(ColumnVector vector, int row);
}
@@ -205,7 +198,7 @@ public class AvroWriter {
}
if (cv.noNulls || !cv.isNull[row]) {
DecimalColumnVector vector = (DecimalColumnVector) cv;
- return AvroSerdeUtils.getBufferFromDecimal(
+ return getBufferFromDecimal(
vector.vector[row].getHiveDecimal(), scale);
} else {
return null;
@@ -335,9 +328,9 @@ public class AvroWriter {
public AvroWriter(Path path, TypeDescription schema,
Configuration conf,
- String compression) throws IOException, AvroSerdeException {
+ String compression) throws IOException {
List<TypeDescription> childTypes = schema.getChildren();
- Schema avroSchema = createAvroSchema(schema, conf);
+ Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
List<Schema.Field> avroFields = avroSchema.getFields();
converters = new AvroConverter[childTypes.size()];
for(int c=0; c < converters.length; ++c) {
@@ -365,4 +358,18 @@ public class AvroWriter {
public void close() throws IOException {
writer.close();
}
+
+ static Buffer getBufferFromBytes(byte[] input) {
+ ByteBuffer bb = ByteBuffer.wrap(input);
+ return bb.rewind();
+ }
+
+ public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
+ if (dec == null) {
+ return null;
+ }
+
+ dec = dec.setScale(scale);
+ return getBufferFromBytes(dec.unscaledValue().toByteArray());
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
index 4641108..4b17819 100644
--- a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
+++ b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
@@ -41,7 +41,6 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.iq80.snappy.SnappyInputStream;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
index 2610328..917707d 100644
--- a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
+++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
@@ -41,7 +41,7 @@ import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.iq80.snappy.SnappyInputStream;
+import io.airlift.compress.snappy.HadoopSnappyInputStream;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -197,7 +197,7 @@ public class FullReadBenchmark {
if ("zlib".equals(compression)) {
input = new GZIPInputStream(input);
} else if ("snappy".equals(compression)) {
- input = new SnappyInputStream(input);
+ input = new HadoopSnappyInputStream(input);
} else if (!"none".equals(compression)) {
throw new IllegalArgumentException("Unknown compression " + compression);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
index 59c758f..cbc1997 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
@@ -21,7 +21,7 @@ package org.apache.orc.bench;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
index 8f66c04..dee5da6 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
@@ -21,7 +21,7 @@ package org.apache.orc.bench;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.orc.OrcFile;
import org.apache.orc.CompressionKind;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index b894b2d..9b77760 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -267,6 +267,12 @@
<version>1.8.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <classifier>hadoop2</classifier>
+ <version>1.8.1</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
@@ -365,17 +371,6 @@
<version>${storage-api.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>2.1.0</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.1</version>