You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/06 00:23:52 UTC

[incubator-iceberg] branch master updated: Fix handling of null partition values (#100)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a90c7ac  Fix handling of null partition values (#100)
a90c7ac is described below

commit a90c7acca6cc8aec08362b8c193f89b980a8c63d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Mar 5 16:23:48 2019 -0800

    Fix handling of null partition values (#100)
    
    * Fix StructLikeWrapper equals and hashCode null handling.
    * Spark: Fix reading null partition values.
    * Add test for null partition values.
---
 .../netflix/iceberg/util/StructLikeWrapper.java    |   5 +-
 .../com/netflix/iceberg/spark/source/Reader.java   |   7 +-
 .../iceberg/spark/source/TestPartitionValues.java  | 130 +++++++++++++++++++++
 3 files changed, 139 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java b/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java
index 7a76d58..983a8f1 100644
--- a/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java
+++ b/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java
@@ -20,6 +20,7 @@
 package com.netflix.iceberg.util;
 
 import com.netflix.iceberg.StructLike;
+import java.util.Objects;
 
 /**
  * Wrapper to adapt StructLike for use in maps and sets by implementing equals and hashCode.
@@ -68,7 +69,7 @@ public class StructLikeWrapper {
     }
 
     for (int i = 0; i < len; i += 1) {
-      if (!struct.get(i, Object.class).equals(that.struct.get(i, Object.class))) {
+      if (!Objects.equals(struct.get(i, Object.class), that.struct.get(i, Object.class))) {
         return false;
       }
     }
@@ -82,7 +83,7 @@ public class StructLikeWrapper {
     int len = struct.size();
     result = 41 * result + len;
     for (int i = 0; i < len; i += 1) {
-      result = 41 * result + struct.get(i, Object.class).hashCode();
+      result = 41 * result + Objects.hashCode(struct.get(i, Object.class));
     }
     return result;
   }
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
index ed6ed21..85628de 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
@@ -488,7 +488,12 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     @Override
     public InternalRow apply(StructLike tuple) {
       for (int i = 0; i < types.length; i += 1) {
-        reusedRow.update(i, convert(tuple.get(positions[i], javaTypes[i]), types[i]));
+        Object value = tuple.get(positions[i], javaTypes[i]);
+        if (value != null) {
+          reusedRow.update(i, convert(value, types[i]));
+        } else {
+          reusedRow.setNullAt(i);
+        }
       }
 
       return reusedRow;
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestPartitionValues.java
new file mode 100644
index 0000000..f7c0c60
--- /dev/null
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestPartitionValues.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.Table;
+import com.netflix.iceberg.TableProperties;
+import com.netflix.iceberg.hadoop.HadoopTables;
+import com.netflix.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.util.List;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestPartitionValues {
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet" },
+        new Object[] { "avro" }
+    };
+  }
+
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+      .identity("data")
+      .build();
+
+  private static SparkSession spark = null;
+
+  @BeforeClass
+  public static void startSpark() {
+    TestPartitionValues.spark = SparkSession.builder().master("local[2]").getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession spark = TestPartitionValues.spark;
+    TestPartitionValues.spark = null;
+    spark.stop();
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private final String format;
+
+  public TestPartitionValues(String format) {
+    this.format = format;
+  }
+
+  @Test
+  public void testNullPartitionValue() throws Exception {
+    String desc = "null_part";
+    File parent = temp.newFolder(desc);
+    File location = new File(parent, "test");
+    File dataFolder = new File(location, "data");
+    Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+    HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+    Table table = tables.create(SCHEMA, SPEC, location.toString());
+    table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, null)
+    );
+
+    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+    try {
+      // TODO: incoming columns must be ordered according to the table's schema
+      df.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(location.toString());
+
+      Dataset<Row> result = spark.read()
+          .format("iceberg")
+          .load(location.toString());
+
+      List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+
+      Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+      Assert.assertEquals("Result rows should match", expected, actual);
+
+    } finally {
+      TestTables.clearTables();
+    }
+  }
+
+}