You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:08 UTC

[06/15] orc git commit: more updates

more updates


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/86628bcb
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/86628bcb
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/86628bcb

Branch: refs/heads/orc-72
Commit: 86628bcbffc1d19f8f2f1fe5c840ac9d429d3dc6
Parents: 5ae2d41
Author: Owen O'Malley <om...@apache.org>
Authored: Sat Oct 1 10:24:32 2016 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Oct 10 13:59:16 2016 -0700

----------------------------------------------------------------------
 java/bench/pom.xml                              |   5 +
 .../hadoop/hive/ql/io/orc/VectorToWritable.java |  70 -------
 .../src/java/org/apache/orc/bench/AvroScan.java |   1 -
 .../org/apache/orc/bench/AvroSchemaUtils.java   | 190 +++++++++++++++++++
 .../java/org/apache/orc/bench/AvroWriter.java   |  31 +--
 .../orc/bench/ColumnProjectionBenchmark.java    |   1 -
 .../org/apache/orc/bench/FullReadBenchmark.java |   4 +-
 .../java/org/apache/orc/bench/GithubToOrc.java  |   2 +-
 .../java/org/apache/orc/bench/TaxiToOrc.java    |   2 +-
 java/pom.xml                                    |  17 +-
 10 files changed, 224 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/pom.xml
----------------------------------------------------------------------
diff --git a/java/bench/pom.xml b/java/bench/pom.xml
index 738dfb3..f0bf55a 100644
--- a/java/bench/pom.xml
+++ b/java/bench/pom.xml
@@ -46,6 +46,11 @@
       <artifactId>avro</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>hadoop2</classifier>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-csv</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java b/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java
deleted file mode 100644
index ae8e8da..0000000
--- a/java/bench/src/java/org/apache/hadoop/hive/ql/io/orc/VectorToWritable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.orc;
-
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.orc.OrcProto;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.TypeDescription;
-
-import java.util.List;
-
-/**
- * This class is just here to provide a public API to some of the ORC internal
- * methods.
- */
-public class VectorToWritable {
-  public static ObjectInspector createObjectInspector(TypeDescription schema) {
-    // convert the type descr to protobuf types
-    List<OrcProto.Type> types = OrcUtils.getOrcTypes(schema);
-    // convert the protobuf types to an ObjectInspector
-    return OrcStruct.createObjectInspector(0, types);
-  }
-
-  public static Object createValue(VectorizedRowBatch batch,
-                                   int row,
-                                   TypeDescription schema,
-                                   Object previous) {
-    if(schema.getCategory() == TypeDescription.Category.STRUCT) {
-      List<TypeDescription> children = schema.getChildren();
-      int numberOfChildren = children.size();
-      OrcStruct result;
-      if(previous != null && previous.getClass() == OrcStruct.class) {
-        result = (OrcStruct)previous;
-        if(result.getNumFields() != numberOfChildren) {
-          result.setNumFields(numberOfChildren);
-        }
-      } else {
-        result = new OrcStruct(numberOfChildren);
-        previous = result;
-      }
-
-      for(int i = 0; i < numberOfChildren; ++i) {
-        result.setFieldValue(i, RecordReaderImpl.nextValue(batch.cols[i], row,
-            children.get(i), result.getFieldValue(i)));
-      }
-    } else {
-      previous = RecordReaderImpl.nextValue(batch.cols[0], row, schema,
-          previous);
-    }
-    ;
-    return previous;
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/AvroScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
index fcb8fce..61f6a62 100644
--- a/java/bench/src/java/org/apache/orc/bench/AvroScan.java
+++ b/java/bench/src/java/org/apache/orc/bench/AvroScan.java
@@ -26,7 +26,6 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.orc.TypeDescription;
 
 public class AvroScan {
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
new file mode 100644
index 0000000..02931c3
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench;
+
+import org.apache.avro.Schema;
+import org.apache.orc.TypeDescription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Convert Hive TypeInfo to an Avro Schema
+ */
+public class AvroSchemaUtils {
+
+  private AvroSchemaUtils() {
+    // No instances
+  }
+
+  public static Schema createAvroSchema(TypeDescription typeInfo) {
+    Schema schema;
+    switch (typeInfo.getCategory()) {
+      case STRING:
+        schema = Schema.create(Schema.Type.STRING);
+        break;
+      case CHAR:
+        schema = getSchemaFor("{" +
+            "\"type\":\"string\"," +
+            "\"logicalType\":\"char\"," +
+            "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+        break;
+      case VARCHAR:
+        schema = getSchemaFor("{" +
+            "\"type\":\"string\"," +
+            "\"logicalType\":\"varchar\"," +
+            "\"maxLength\":" + typeInfo.getMaxLength() + "}");
+        break;
+      case BINARY:
+        schema = Schema.create(Schema.Type.BYTES);
+        break;
+      case BYTE:
+        schema = Schema.create(Schema.Type.INT);
+        break;
+      case SHORT:
+        schema = Schema.create(Schema.Type.INT);
+        break;
+      case INT:
+        schema = Schema.create(Schema.Type.INT);
+        break;
+      case LONG:
+        schema = Schema.create(Schema.Type.LONG);
+        break;
+      case FLOAT:
+        schema = Schema.create(Schema.Type.FLOAT);
+        break;
+      case DOUBLE:
+        schema = Schema.create(Schema.Type.DOUBLE);
+        break;
+      case BOOLEAN:
+        schema = Schema.create(Schema.Type.BOOLEAN);
+        break;
+      case DECIMAL:
+        String precision = String.valueOf(typeInfo.getPrecision());
+        String scale = String.valueOf(typeInfo.getScale());
+        schema = getSchemaFor("{" +
+            "\"type\":\"bytes\"," +
+            "\"logicalType\":\"decimal\"," +
+            "\"precision\":" + precision + "," +
+            "\"scale\":" + scale + "}");
+        break;
+      case DATE:
+        schema = getSchemaFor("{" +
+            "\"type\":\"int\"," +
+            "\"logicalType\":\"date\"}");
+        break;
+      case TIMESTAMP:
+        schema = getSchemaFor("{" +
+            "\"type\":\"long\"," +
+            "\"logicalType\":\"timestamp-millis\"}");
+        break;
+      case LIST:
+        schema = createAvroArray(typeInfo);
+        break;
+      case MAP:
+        schema = createAvroMap(typeInfo);
+        break;
+      case STRUCT:
+        schema = createAvroRecord(typeInfo);
+        break;
+      case UNION:
+        schema = createAvroUnion(typeInfo);
+        break;
+      default:
+        throw new UnsupportedOperationException(typeInfo + " is not supported.");
+    }
+
+    return wrapInUnionWithNull(schema);
+  }
+
+  private static Schema createAvroUnion(TypeDescription typeInfo) {
+    List<Schema> childSchemas = new ArrayList<>();
+    for (TypeDescription childTypeInfo : typeInfo.getChildren()) {
+      Schema childSchema = createAvroSchema(childTypeInfo);
+      if (childSchema.getType() == Schema.Type.UNION) {
+        for (Schema grandkid: childSchema.getTypes()) {
+          if (childSchema.getType() != Schema.Type.NULL) {
+            childSchemas.add(grandkid);
+          }
+        }
+      } else {
+        childSchemas.add(childSchema);
+      }
+    }
+
+    return Schema.createUnion(childSchemas);
+  }
+
+  private static Schema createAvroRecord(TypeDescription typeInfo) {
+    List<Schema.Field> childFields = new ArrayList<>();
+
+    List<String> fieldNames = typeInfo.getFieldNames();
+    List<TypeDescription> fieldTypes = typeInfo.getChildren();
+
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      TypeDescription childTypeInfo = fieldTypes.get(i);
+      Schema.Field field = new Schema.Field(fieldNames.get(i),
+          createAvroSchema(childTypeInfo), childTypeInfo.toString(),
+          (Object) null);
+      childFields.add(field);
+    }
+
+    Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(),
+        typeInfo.toString(), null, false);
+    recordSchema.setFields(childFields);
+    return recordSchema;
+  }
+
+  private static Schema createAvroMap(TypeDescription typeInfo) {
+    TypeDescription keyTypeInfo = typeInfo.getChildren().get(0);
+    if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) {
+      throw new UnsupportedOperationException("Avro only supports maps with string keys "
+          + typeInfo);
+    }
+
+    Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1));
+
+    return Schema.createMap(valueSchema);
+  }
+
+  private static Schema createAvroArray(TypeDescription typeInfo) {
+    Schema child = createAvroSchema(typeInfo.getChildren().get(0));
+    return Schema.createArray(child);
+  }
+
+  private static Schema wrapInUnionWithNull(Schema schema) {
+    Schema NULL = Schema.create(Schema.Type.NULL);
+    switch (schema.getType()) {
+      case NULL:
+        return schema;
+      case UNION:
+        List<Schema> kids = schema.getTypes();
+        List<Schema> newKids = new ArrayList<>(kids.size() + 1);
+        newKids.add(NULL);
+        return Schema.createUnion(newKids);
+      default:
+        return Schema.createUnion(Arrays.asList(NULL, schema));
+    }
+  }
+
+  private static Schema getSchemaFor(String str) {
+    Schema.Parser parser = new Schema.Parser();
+    return parser.parse(str);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
index ca0984b..094d115 100644
--- a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
+++ b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -35,11 +36,10 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.orc.TypeDescription;
 
 import java.io.IOException;
+import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Properties;
@@ -69,13 +69,6 @@ public class AvroWriter {
     return properties;
   }
 
-  static Schema createAvroSchema(TypeDescription schema,
-                                 Configuration conf
-                                 ) throws IOException, AvroSerdeException {
-    Properties properties = setHiveSchema(schema);
-    return AvroSerdeUtils.determineSchemaOrThrowException(conf, properties);
-  }
-
   interface AvroConverter {
     Object convert(ColumnVector vector, int row);
   }
@@ -205,7 +198,7 @@ public class AvroWriter {
       }
       if (cv.noNulls || !cv.isNull[row]) {
         DecimalColumnVector vector = (DecimalColumnVector) cv;
-        return AvroSerdeUtils.getBufferFromDecimal(
+        return getBufferFromDecimal(
             vector.vector[row].getHiveDecimal(), scale);
       } else {
         return null;
@@ -335,9 +328,9 @@ public class AvroWriter {
 
   public AvroWriter(Path path, TypeDescription schema,
                     Configuration conf,
-                    String compression) throws IOException, AvroSerdeException {
+                    String compression) throws IOException {
     List<TypeDescription> childTypes = schema.getChildren();
-    Schema avroSchema = createAvroSchema(schema, conf);
+    Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema);
     List<Schema.Field> avroFields = avroSchema.getFields();
     converters = new AvroConverter[childTypes.size()];
     for(int c=0; c < converters.length; ++c) {
@@ -365,4 +358,18 @@ public class AvroWriter {
   public void close() throws IOException {
     writer.close();
   }
+
+  static Buffer getBufferFromBytes(byte[] input) {
+    ByteBuffer bb = ByteBuffer.wrap(input);
+    return bb.rewind();
+  }
+
+  public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) {
+    if (dec == null) {
+      return null;
+    }
+
+    dec = dec.setScale(scale);
+    return getBufferFromBytes(dec.unscaledValue().toByteArray());
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
index 4641108..4b17819 100644
--- a/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
+++ b/java/bench/src/java/org/apache/orc/bench/ColumnProjectionBenchmark.java
@@ -41,7 +41,6 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.iq80.snappy.SnappyInputStream;
 import org.openjdk.jmh.annotations.AuxCounters;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
index 2610328..917707d 100644
--- a/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
+++ b/java/bench/src/java/org/apache/orc/bench/FullReadBenchmark.java
@@ -41,7 +41,7 @@ import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.iq80.snappy.SnappyInputStream;
+import io.airlift.compress.snappy.HadoopSnappyInputStream;
 import org.openjdk.jmh.annotations.AuxCounters;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -197,7 +197,7 @@ public class FullReadBenchmark {
     if ("zlib".equals(compression)) {
       input = new GZIPInputStream(input);
     } else if ("snappy".equals(compression)) {
-      input = new SnappyInputStream(input);
+      input = new HadoopSnappyInputStream(input);
     } else if (!"none".equals(compression)) {
       throw new IllegalArgumentException("Unknown compression " + compression);
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
index 59c758f..cbc1997 100644
--- a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java
@@ -21,7 +21,7 @@ package org.apache.orc.bench;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
index 8f66c04..dee5da6 100644
--- a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
+++ b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java
@@ -21,7 +21,7 @@ package org.apache.orc.bench;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.orc.OrcFile;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;

http://git-wip-us.apache.org/repos/asf/orc/blob/86628bcb/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index b894b2d..9b77760 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -267,6 +267,12 @@
         <version>1.8.1</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-mapred</artifactId>
+        <classifier>hadoop2</classifier>
+        <version>1.8.1</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
@@ -365,17 +371,6 @@
         <version>${storage-api.version}</version>
       </dependency>
       <dependency>
-        <groupId>org.apache.hive</groupId>
-        <artifactId>hive-exec</artifactId>
-        <version>2.1.0</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.apache.calcite</groupId>
-            <artifactId>calcite-core</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
         <groupId>org.codehaus.jettison</groupId>
         <artifactId>jettison</artifactId>
         <version>1.1</version>