You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/31 23:03:06 UTC

[iceberg] branch master updated: Spark: Support metadata columns in 3.2 (#3373)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new aeccf01  Spark: Support metadata columns in 3.2 (#3373)
aeccf01 is described below

commit aeccf01c1defd57200e1d6f54542373944c9e9da
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Sun Oct 31 16:02:55 2021 -0700

    Spark: Support metadata columns in 3.2 (#3373)
---
 .../iceberg/spark/source/SparkMetadataColumn.java  | 51 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    | 19 +++++++-
 .../iceberg/spark/source/TestSparkCatalog.java     | 19 ++++----
 .../spark/source/TestSparkMetadataColumns.java     |  6 +--
 4 files changed, 82 insertions(+), 13 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
new file mode 100644
index 0000000..638cb72
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
+import org.apache.spark.sql.types.DataType;
+
+public class SparkMetadataColumn implements MetadataColumn {
+
+  private final String name;
+  private final DataType dataType;
+  private final boolean isNullable;
+
+  public SparkMetadataColumn(String name, DataType dataType, boolean isNullable) {
+    this.name = name;
+    this.dataType = dataType;
+    this.isNullable = isNullable;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public DataType dataType() {
+    return dataType;
+  }
+
+  @Override
+  public boolean isNullable() {
+    return isNullable;
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 2ace8dd..d1d6c7f 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -24,7 +24,9 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -45,7 +47,9 @@ import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
 import org.apache.spark.sql.connector.catalog.SupportsDelete;
+import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -56,6 +60,8 @@ import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.write.LogicalWriteInfo;
 import org.apache.spark.sql.connector.write.WriteBuilder;
 import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
@@ -69,7 +75,7 @@ import static org.apache.iceberg.TableProperties.UPDATE_MODE;
 import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
 
 public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
-    SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge {
+    SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge, SupportsMetadataColumns {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);
 
@@ -169,6 +175,17 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
   }
 
   @Override
+  public MetadataColumn[] metadataColumns() {
+    DataType sparkPartitionType = SparkSchemaUtil.convert(Partitioning.partitionType(table()));
+    return new MetadataColumn[] {
+        new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false),
+        new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true),
+        new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false),
+        new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false)
+    };
+  }
+
+  @Override
   public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
     if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
       // skip planning the job and fetch already staged file scan tasks
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 027c88c..0e52808 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,6 +19,9 @@
 
 package org.apache.iceberg.spark.source;
 
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -30,14 +33,14 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
 
   @Override
   public Table loadTable(Identifier ident) throws NoSuchTableException {
-    String[] parts = ident.name().split("\\$", 2);
-    if (parts.length == 2) {
-      TestTables.TestTable table = TestTables.load(parts[0]);
-      String[] metadataColumns = parts[1].split(",");
-      return new SparkTestTable(table, metadataColumns, false);
-    } else {
-      TestTables.TestTable table = TestTables.load(ident.name());
-      return new SparkTestTable(table, null, false);
+    TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident);
+    Namespace namespace = tableIdentifier.namespace();
+
+    TestTables.TestTable table = TestTables.load(tableIdentifier.toString());
+    if (table == null && namespace.equals(Namespace.of("default"))) {
+      table = TestTables.load(tableIdentifier.name());
     }
+
+    return new SparkTable(table, false);
   }
 }
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index b29d281..848545c 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -117,8 +117,6 @@ public class TestSparkMetadataColumns extends SparkTestBase {
     TestTables.clearTables();
   }
 
-  // TODO: remove testing workarounds once we compile against Spark 3.2
-
   @Test
   public void testSpecAndPartitionMetadataColumns() {
     // TODO: support metadata structs in vectorized ORC reads
@@ -156,7 +154,7 @@ public class TestSparkMetadataColumns extends SparkTestBase {
         row(3, row(null, 2))
     );
     assertEquals("Rows must match", expected,
-        sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME));
+        sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id", TABLE_NAME));
   }
 
   @Test
@@ -168,7 +166,7 @@ public class TestSparkMetadataColumns extends SparkTestBase {
 
     AssertHelpers.assertThrows("Should fail to query the partition metadata column",
         ValidationException.class, "Cannot build table partition type, unknown transforms",
-        () -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME));
+        () -> sql("SELECT _partition FROM %s", TABLE_NAME));
   }
 
   private void createAndInitTable() throws IOException {