You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/17 16:45:13 UTC

[iceberg] branch 0.13.x updated: Spark: Fix NPEs in Spark value converter (#4663) (#4781)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/0.13.x by this push:
     new c0228a9a7 Spark: Fix NPEs in Spark value converter (#4663) (#4781)
c0228a9a7 is described below

commit c0228a9a72db3e3d282726cd7207b4497c3cf9ae
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Tue May 17 18:45:08 2022 +0200

    Spark: Fix NPEs in Spark value converter (#4663) (#4781)
    
    Co-authored-by: Edgar Rodriguez <ed...@airbnb.com>
---
 .../apache/iceberg/spark/SparkValueConverter.java  | 32 ++++++--
 .../iceberg/spark/TestSparkValueConverter.java     | 90 ++++++++++++++++++++++
 .../apache/iceberg/spark/SparkValueConverter.java  |  4 +
 .../iceberg/spark/TestSparkValueConverter.java     | 90 ++++++++++++++++++++++
 .../apache/iceberg/spark/SparkValueConverter.java  | 11 ++-
 .../iceberg/spark/TestSparkValueConverter.java     | 90 ++++++++++++++++++++++
 .../apache/iceberg/spark/SparkValueConverter.java  | 11 ++-
 .../iceberg/spark/TestSparkValueConverter.java     | 90 ++++++++++++++++++++++
 8 files changed, 408 insertions(+), 10 deletions(-)

diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
index ef453c0ce..150ef9ad2 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
@@ -58,20 +58,32 @@ public class SparkValueConverter {
       case LIST:
         List<Object> convertedList = Lists.newArrayList();
         List<?> list = (List<?>) object;
-        for (Object element : list) {
-          convertedList.add(convert(type.asListType().elementType(), element));
+        try {
+          for (Object element : list) {
+            convertedList.add(convert(type.asListType().elementType(), element));
+          }
+          return convertedList;
+        } catch (NullPointerException npe) {
+          // Scala 2.11 fix: Catch NPE as internal value could be null and scala wrapper does not
+          // evaluate until iteration.
+          return null;
         }
-        return convertedList;
 
       case MAP:
         Map<Object, Object> convertedMap = Maps.newLinkedHashMap();
         Map<?, ?> map = (Map<?, ?>) object;
-        for (Map.Entry<?, ?> entry : map.entrySet()) {
-          convertedMap.put(
-              convert(type.asMapType().keyType(), entry.getKey()),
-              convert(type.asMapType().valueType(), entry.getValue()));
+        try {
+          for (Map.Entry<?, ?> entry : map.entrySet()) {
+            convertedMap.put(
+                convert(type.asMapType().keyType(), entry.getKey()),
+                convert(type.asMapType().valueType(), entry.getValue()));
+          }
+          return convertedMap;
+        } catch (NullPointerException npe) {
+          // Scala 2.11 fix: Catch NPE as internal value could be null and scala wrapper does not
+          // evaluate until iteration.
+          return null;
         }
-        return convertedMap;
 
       case DATE:
         return DateTimeUtils.fromJavaDate((Date) object);
@@ -95,6 +107,10 @@ public class SparkValueConverter {
   }
 
   private static Record convert(Types.StructType struct, Row row) {
+    if (row == null) {
+      return null;
+    }
+
     Record record = GenericRecord.create(struct);
     List<Types.NestedField> fields = struct.fields();
     for (int i = 0; i < fields.size(); i += 1) {
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
new file mode 100644
index 000000000..57941b8c7
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkValueConverter {
+  @Test
+  public void testSparkNullMapConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7,
+            Types.StringType.get(),
+            Types.StructType.of(
+                Types.NestedField.required(1, "lat", Types.FloatType.get()),
+                Types.NestedField.required(2, "long", Types.FloatType.get())
+            )
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullListConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations",
+            Types.ListType.ofOptional(6, Types.StringType.get())
+        )
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullStructConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StructType.of(
+            Types.NestedField.required(1, "lat", Types.FloatType.get()),
+            Types.NestedField.required(2, "long", Types.FloatType.get())
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullPrimitiveConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StringType.get())
+    );
+    assertCorrectNullConversion(schema);
+  }
+
+  private void assertCorrectNullConversion(Schema schema) {
+    Row sparkRow = RowFactory.create(1, null);
+    Record record = GenericRecord.create(schema);
+    record.set(0, 1);
+    Assert.assertEquals("Round-trip conversion should produce original value",
+        record,
+        SparkValueConverter.convert(schema, sparkRow));
+  }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
index ef453c0ce..b3e6b2f48 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
@@ -95,6 +95,10 @@ public class SparkValueConverter {
   }
 
   private static Record convert(Types.StructType struct, Row row) {
+    if (row == null) {
+      return null;
+    }
+
     Record record = GenericRecord.create(struct);
     List<Types.NestedField> fields = struct.fields();
     for (int i = 0; i < fields.size(); i += 1) {
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
new file mode 100644
index 000000000..57941b8c7
--- /dev/null
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkValueConverter {
+  @Test
+  public void testSparkNullMapConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7,
+            Types.StringType.get(),
+            Types.StructType.of(
+                Types.NestedField.required(1, "lat", Types.FloatType.get()),
+                Types.NestedField.required(2, "long", Types.FloatType.get())
+            )
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullListConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations",
+            Types.ListType.ofOptional(6, Types.StringType.get())
+        )
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullStructConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StructType.of(
+            Types.NestedField.required(1, "lat", Types.FloatType.get()),
+            Types.NestedField.required(2, "long", Types.FloatType.get())
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullPrimitiveConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StringType.get())
+    );
+    assertCorrectNullConversion(schema);
+  }
+
+  private void assertCorrectNullConversion(Schema schema) {
+    Row sparkRow = RowFactory.create(1, null);
+    Record record = GenericRecord.create(schema);
+    record.set(0, 1);
+    Assert.assertEquals("Round-trip conversion should produce original value",
+        record,
+        SparkValueConverter.convert(schema, sparkRow));
+  }
+}
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
index ef453c0ce..8a7920351 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
@@ -95,6 +95,10 @@ public class SparkValueConverter {
   }
 
   private static Record convert(Types.StructType struct, Row row) {
+    if (row == null) {
+      return null;
+    }
+
     Record record = GenericRecord.create(struct);
     List<Types.NestedField> fields = struct.fields();
     for (int i = 0; i < fields.size(); i += 1) {
@@ -107,7 +111,12 @@ public class SparkValueConverter {
           record.set(i, convert(fieldType.asStructType(), row.getStruct(i)));
           break;
         case LIST:
-          record.set(i, convert(fieldType.asListType(), row.getList(i)));
+          try {
+            record.set(i, convert(fieldType.asListType(), row.getList(i)));
+          } catch (NullPointerException npe) {
+            // Handle https://issues.apache.org/jira/browse/SPARK-37654
+            record.set(i, convert(fieldType.asListType(), null));
+          }
           break;
         case MAP:
           record.set(i, convert(fieldType.asMapType(), row.getJavaMap(i)));
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
new file mode 100644
index 000000000..57941b8c7
--- /dev/null
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkValueConverter {
+  @Test
+  public void testSparkNullMapConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7,
+            Types.StringType.get(),
+            Types.StructType.of(
+                Types.NestedField.required(1, "lat", Types.FloatType.get()),
+                Types.NestedField.required(2, "long", Types.FloatType.get())
+            )
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullListConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations",
+            Types.ListType.ofOptional(6, Types.StringType.get())
+        )
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullStructConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StructType.of(
+            Types.NestedField.required(1, "lat", Types.FloatType.get()),
+            Types.NestedField.required(2, "long", Types.FloatType.get())
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullPrimitiveConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StringType.get())
+    );
+    assertCorrectNullConversion(schema);
+  }
+
+  private void assertCorrectNullConversion(Schema schema) {
+    Row sparkRow = RowFactory.create(1, null);
+    Record record = GenericRecord.create(schema);
+    record.set(0, 1);
+    Assert.assertEquals("Round-trip conversion should produce original value",
+        record,
+        SparkValueConverter.convert(schema, sparkRow));
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
index ef453c0ce..8a7920351 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkValueConverter.java
@@ -95,6 +95,10 @@ public class SparkValueConverter {
   }
 
   private static Record convert(Types.StructType struct, Row row) {
+    if (row == null) {
+      return null;
+    }
+
     Record record = GenericRecord.create(struct);
     List<Types.NestedField> fields = struct.fields();
     for (int i = 0; i < fields.size(); i += 1) {
@@ -107,7 +111,12 @@ public class SparkValueConverter {
           record.set(i, convert(fieldType.asStructType(), row.getStruct(i)));
           break;
         case LIST:
-          record.set(i, convert(fieldType.asListType(), row.getList(i)));
+          try {
+            record.set(i, convert(fieldType.asListType(), row.getList(i)));
+          } catch (NullPointerException npe) {
+            // Handle https://issues.apache.org/jira/browse/SPARK-37654
+            record.set(i, convert(fieldType.asListType(), null));
+          }
           break;
         case MAP:
           record.set(i, convert(fieldType.asMapType(), row.getJavaMap(i)));
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
new file mode 100644
index 000000000..57941b8c7
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkValueConverter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkValueConverter {
+  @Test
+  public void testSparkNullMapConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations", Types.MapType.ofOptional(6, 7,
+            Types.StringType.get(),
+            Types.StructType.of(
+                Types.NestedField.required(1, "lat", Types.FloatType.get()),
+                Types.NestedField.required(2, "long", Types.FloatType.get())
+            )
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullListConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "locations",
+            Types.ListType.ofOptional(6, Types.StringType.get())
+        )
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullStructConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StructType.of(
+            Types.NestedField.required(1, "lat", Types.FloatType.get()),
+            Types.NestedField.required(2, "long", Types.FloatType.get())
+        ))
+    );
+
+    assertCorrectNullConversion(schema);
+  }
+
+  @Test
+  public void testSparkNullPrimitiveConvert() {
+    Schema schema = new Schema(
+        Types.NestedField.required(0, "id", Types.LongType.get()),
+        Types.NestedField.optional(5, "location", Types.StringType.get())
+    );
+    assertCorrectNullConversion(schema);
+  }
+
+  private void assertCorrectNullConversion(Schema schema) {
+    Row sparkRow = RowFactory.create(1, null);
+    Record record = GenericRecord.create(schema);
+    record.set(0, 1);
+    Assert.assertEquals("Round-trip conversion should produce original value",
+        record,
+        SparkValueConverter.convert(schema, sparkRow));
+  }
+}