You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/11 20:38:39 UTC

[incubator-iceberg] branch vectorized-read updated: Shade Arrow dependencies (#690)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new b141bba  Shade Arrow dependencies  (#690)
b141bba is described below

commit b141bba88d5fe12709e4aab6c81197ecd0625380
Author: Gautam <ga...@gmail.com>
AuthorDate: Wed Dec 11 12:38:31 2019 -0800

    Shade Arrow dependencies  (#690)
---
 .../org/apache/iceberg/arrow/ArrowSchemaUtil.java  |  0
 .../org/apache/iceberg/arrow/ArrowUtils.scala      | 93 ++++++++++++++++++++++
 .../apache/iceberg/arrow/ArrowSchemaUtilTest.java  |  0
 build.gradle                                       | 16 +++-
 .../parquet/arrow/IcebergArrowColumnVector.java    |  2 +-
 .../parquet/arrow/NullValuesColumnVector.java      |  2 +-
 .../data/vector/VectorizedSparkParquetReaders.java |  6 +-
 versions.lock                                      |  1 +
 8 files changed, 112 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
similarity index 100%
rename from core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
rename to arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
diff --git a/arrow/src/main/scala/org/apache/iceberg/arrow/ArrowUtils.scala b/arrow/src/main/scala/org/apache/iceberg/arrow/ArrowUtils.scala
new file mode 100644
index 0000000..eec4f05
--- /dev/null
+++ b/arrow/src/main/scala/org/apache/iceberg/arrow/ArrowUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow
+
+import org.apache.arrow.memory.RootAllocator
+import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType,
+  DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType,
+  ShortType, StringType, StructField, StructType, TimestampType}
+import scala.collection.JavaConverters._
+
+/**
+ * This code is copied from Apache Spark Project's SQL ArrowUtils to be able
+  * to shade the Arrow dependencies and keep Arrow code independent from Spark
+ */
+object ArrowUtils {
+
+  val rootAllocator = new RootAllocator(Long.MaxValue)
+
+  // todo: support more types.
+  // scalastyle:off cyclomatic.complexity
+  def fromArrowType(dt: ArrowType): DataType = dt match {
+    case ArrowType.Bool.INSTANCE => BooleanType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 => ByteType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 * 2 => ShortType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 * 4 => IntegerType
+    case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 * 8 => LongType
+    case float: ArrowType.FloatingPoint if float.getPrecision() == FloatingPointPrecision.SINGLE => FloatType
+    case float: ArrowType.FloatingPoint if float.getPrecision() == FloatingPointPrecision.DOUBLE => DoubleType
+    case ArrowType.Utf8.INSTANCE => StringType
+    case ArrowType.Binary.INSTANCE => BinaryType
+    case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
+    case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
+    case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType
+    case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt")
+  }
+  // scalastyle:on cyclomatic.complexity
+
+  def fromArrowField(field: Field): DataType = {
+    field.getType match {
+      case ArrowType.List.INSTANCE =>
+        val elementField = field.getChildren().get(0)
+        val elementType = fromArrowField(elementField)
+        ArrayType(elementType, containsNull = elementField.isNullable)
+      case ArrowType.Struct.INSTANCE =>
+        val fields = field.getChildren().asScala.map { child =>
+          val dt = fromArrowField(child)
+          StructField(child.getName, dt, child.isNullable)
+        }
+        StructType(fields)
+      case arrowType: ArrowType => fromArrowType(arrowType)
+    }
+  }
+
+  /** Maps schema from Spark to Arrow. NOTE: timeZoneId required for TimestampType in StructType */
+  def fromArrowSchema(schema: Schema): StructType = {
+    StructType(schema.getFields.asScala.map { field =>
+      val dt = fromArrowField(field)
+      StructField(field.getName, dt, field.isNullable)
+    })
+  }
+
+  /** Return Map with conf settings to be used in ArrowPythonRunner */
+  def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
+    val timeZoneConf = if (conf.pandasRespectSessionTimeZone) {
+      Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone)
+    } else {
+      Nil
+    }
+    val pandasColsByName = Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key ->
+      conf.pandasGroupedMapAssignColumnsByName.toString)
+    Map(timeZoneConf ++ pandasColsByName: _*)
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
similarity index 100%
rename from core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
rename to arrow/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java
diff --git a/build.gradle b/build.gradle
index 57bbd87..e896c34 100644
--- a/build.gradle
+++ b/build.gradle
@@ -129,10 +129,6 @@ project(':iceberg-core') {
       exclude group: 'org.apache.avro', module: 'avro'
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
-    compile("org.apache.arrow:arrow-vector") {
-      exclude group: 'io.netty', module: 'netty-buffer'
-      exclude group: 'io.netty', module: 'netty-common'
-    }
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
   }
 }
@@ -237,10 +233,19 @@ project(':iceberg-parquet') {
 }
 
 project(':iceberg-arrow') {
+  apply plugin: 'scala'
+
   dependencies {
     //    compile project(':iceberg-spark')
     compile project(':iceberg-api')
 
+    compile("org.apache.arrow:arrow-vector") {
+      exclude group: 'io.netty', module: 'netty-buffer'
+      exclude group: 'io.netty', module: 'netty-common'
+    }
+    compile("org.apache.arrow:arrow-memory") {
+      exclude group: 'io.netty', module: 'netty-common'
+    }
     compileOnly("org.apache.spark:spark-hive_2.11") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
@@ -348,6 +353,9 @@ project(':iceberg-spark-runtime') {
     relocate 'org.codehaus.jackson', 'org.apache.iceberg.shaded.org.apache.parquet.shaded.org.codehaus.jackson'
     relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
     relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
+    // relocate Arrow and related deps to shade Iceberg specific version
+    relocate 'io.netty.buffer', 'org.apache.iceberg.shaded.io.netty.buffer'
+    relocate 'org.apache.arrow', 'org.apache.iceberg.shaded.org.apache.arrow'
 
     archiveName = "iceberg-spark-runtime-${version}.${extension}"
   }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
index 214a9a4..dbcc221 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/IcebergArrowColumnVector.java
@@ -36,6 +36,7 @@ import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.StructVector;
 import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.iceberg.arrow.ArrowUtils;
 import org.apache.iceberg.parquet.vectorized.NullabilityHolder;
 import org.apache.iceberg.parquet.vectorized.VectorHolder;
 import org.apache.parquet.Preconditions;
@@ -44,7 +45,6 @@ import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.spark.sql.execution.arrow.ArrowUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.vectorized.ArrowColumnVector;
 import org.apache.spark.sql.vectorized.ColumnVector;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/arrow/NullValuesColumnVector.java b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/NullValuesColumnVector.java
index eabbf40..fe62f25 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/arrow/NullValuesColumnVector.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/arrow/NullValuesColumnVector.java
@@ -22,7 +22,7 @@ package org.apache.iceberg.parquet.arrow;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
-import org.apache.spark.sql.execution.arrow.ArrowUtils;
+import org.apache.iceberg.arrow.ArrowUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarArray;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
index afd6562..9e96aae 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.arrow.ArrowSchemaUtil;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
@@ -38,7 +39,6 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
-import org.apache.spark.sql.execution.arrow.ArrowUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,7 +88,9 @@ public class VectorizedSparkParquetReaders {
       this.projectedIcebergSchema = projectedIcebergSchema;
       this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema);
       this.recordsPerBatch = recordsPerBatch;
-      this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+      // this.rootAllocator = ArrowUtils.rootAllocator().newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
+      this.rootAllocator = new RootAllocator(Long.MAX_VALUE)
+          .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE);
       LOG.info("=> [ReadBuilder] recordsPerBatch = {}", this.recordsPerBatch);
     }
 
diff --git a/versions.lock b/versions.lock
index 7856632..8f04955 100644
--- a/versions.lock
+++ b/versions.lock
@@ -67,6 +67,7 @@ io.dropwizard.metrics:metrics-json:3.1.5 (1 constraints: 1a0dc936)
 io.dropwizard.metrics:metrics-jvm:3.1.5 (1 constraints: 1a0dc936)
 io.netty:netty:3.9.9.Final (9 constraints: 9eb0396d)
 io.netty:netty-all:4.1.17.Final (3 constraints: d2312526)
+io.netty:netty-buffer:4.1.27.Final (1 constraints: 4a0fee77)
 it.unimi.dsi:fastutil:7.0.13 (1 constraints: fc0d4043)
 javax.activation:activation:1.1.1 (1 constraints: 140dbb36)
 javax.annotation:javax.annotation-api:1.2 (2 constraints: 2d21193d)