You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/06 00:23:52 UTC
[incubator-iceberg] branch master updated: Fix handling of null
partition values (#100)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a90c7ac Fix handling of null partition values (#100)
a90c7ac is described below
commit a90c7acca6cc8aec08362b8c193f89b980a8c63d
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Mar 5 16:23:48 2019 -0800
Fix handling of null partition values (#100)
* Fix StructLikeWrapper equals and hashCode null handling.
* Spark: Fix reading null partition values.
* Add test for null partition values.
---
.../netflix/iceberg/util/StructLikeWrapper.java | 5 +-
.../com/netflix/iceberg/spark/source/Reader.java | 7 +-
.../iceberg/spark/source/TestPartitionValues.java | 130 +++++++++++++++++++++
3 files changed, 139 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java b/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java
index 7a76d58..983a8f1 100644
--- a/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java
+++ b/core/src/main/java/com/netflix/iceberg/util/StructLikeWrapper.java
@@ -20,6 +20,7 @@
package com.netflix.iceberg.util;
import com.netflix.iceberg.StructLike;
+import java.util.Objects;
/**
* Wrapper to adapt StructLike for use in maps and sets by implementing equals and hashCode.
@@ -68,7 +69,7 @@ public class StructLikeWrapper {
}
for (int i = 0; i < len; i += 1) {
- if (!struct.get(i, Object.class).equals(that.struct.get(i, Object.class))) {
+ if (!Objects.equals(struct.get(i, Object.class), that.struct.get(i, Object.class))) {
return false;
}
}
@@ -82,7 +83,7 @@ public class StructLikeWrapper {
int len = struct.size();
result = 41 * result + len;
for (int i = 0; i < len; i += 1) {
- result = 41 * result + struct.get(i, Object.class).hashCode();
+ result = 41 * result + Objects.hashCode(struct.get(i, Object.class));
}
return result;
}
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
index ed6ed21..85628de 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
@@ -488,7 +488,12 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
@Override
public InternalRow apply(StructLike tuple) {
for (int i = 0; i < types.length; i += 1) {
- reusedRow.update(i, convert(tuple.get(positions[i], javaTypes[i]), types[i]));
+ Object value = tuple.get(positions[i], javaTypes[i]);
+ if (value != null) {
+ reusedRow.update(i, convert(value, types[i]));
+ } else {
+ reusedRow.setNullAt(i);
+ }
}
return reusedRow;
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestPartitionValues.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestPartitionValues.java
new file mode 100644
index 0000000..f7c0c60
--- /dev/null
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestPartitionValues.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.Table;
+import com.netflix.iceberg.TableProperties;
+import com.netflix.iceberg.hadoop.HadoopTables;
+import com.netflix.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.util.List;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestPartitionValues {
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] { "parquet" },
+ new Object[] { "avro" }
+ };
+ }
+
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ .identity("data")
+ .build();
+
+ private static SparkSession spark = null;
+
+ @BeforeClass
+ public static void startSpark() {
+ TestPartitionValues.spark = SparkSession.builder().master("local[2]").getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession spark = TestPartitionValues.spark;
+ TestPartitionValues.spark = null;
+ spark.stop();
+ }
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private final String format;
+
+ public TestPartitionValues(String format) {
+ this.format = format;
+ }
+
+ @Test
+ public void testNullPartitionValue() throws Exception {
+ String desc = "null_part";
+ File parent = temp.newFolder(desc);
+ File location = new File(parent, "test");
+ File dataFolder = new File(location, "data");
+ Assert.assertTrue("mkdirs should succeed", dataFolder.mkdirs());
+
+ HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
+ Table table = tables.create(SCHEMA, SPEC, location.toString());
+ table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c"),
+ new SimpleRecord(4, null)
+ );
+
+ Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
+
+ try {
+ // TODO: incoming columns must be ordered according to the table's schema
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString());
+
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+
+ } finally {
+ TestTables.clearTables();
+ }
+ }
+
+}