You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/31 23:03:06 UTC
[iceberg] branch master updated: Spark: Support metadata columns in
3.2 (#3373)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new aeccf01 Spark: Support metadata columns in 3.2 (#3373)
aeccf01 is described below
commit aeccf01c1defd57200e1d6f54542373944c9e9da
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Sun Oct 31 16:02:55 2021 -0700
Spark: Support metadata columns in 3.2 (#3373)
---
.../iceberg/spark/source/SparkMetadataColumn.java | 51 ++++++++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 19 +++++++-
.../iceberg/spark/source/TestSparkCatalog.java | 19 ++++----
.../spark/source/TestSparkMetadataColumns.java | 6 +--
4 files changed, 82 insertions(+), 13 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
new file mode 100644
index 0000000..638cb72
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
+import org.apache.spark.sql.types.DataType;
+
+public class SparkMetadataColumn implements MetadataColumn {
+
+ private final String name;
+ private final DataType dataType;
+ private final boolean isNullable;
+
+ public SparkMetadataColumn(String name, DataType dataType, boolean isNullable) {
+ this.name = name;
+ this.dataType = dataType;
+ this.isNullable = isNullable;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public DataType dataType() {
+ return dataType;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return isNullable;
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 2ace8dd..d1d6c7f 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -24,7 +24,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -45,7 +47,9 @@ import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
import org.apache.spark.sql.connector.catalog.SupportsDelete;
+import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -56,6 +60,8 @@ import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
@@ -69,7 +75,7 @@ import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
- SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge {
+ SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge, SupportsMetadataColumns {
private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);
@@ -169,6 +175,17 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
}
@Override
+ public MetadataColumn[] metadataColumns() {
+ DataType sparkPartitionType = SparkSchemaUtil.convert(Partitioning.partitionType(table()));
+ return new MetadataColumn[] {
+ new SparkMetadataColumn(MetadataColumns.SPEC_ID.name(), DataTypes.IntegerType, false),
+ new SparkMetadataColumn(MetadataColumns.PARTITION_COLUMN_NAME, sparkPartitionType, true),
+ new SparkMetadataColumn(MetadataColumns.FILE_PATH.name(), DataTypes.StringType, false),
+ new SparkMetadataColumn(MetadataColumns.ROW_POSITION.name(), DataTypes.LongType, false)
+ };
+ }
+
+ @Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
// skip planning the job and fetch already staged file scan tasks
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 027c88c..0e52808 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,6 +19,9 @@
package org.apache.iceberg.spark.source;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -30,14 +33,14 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
- String[] parts = ident.name().split("\\$", 2);
- if (parts.length == 2) {
- TestTables.TestTable table = TestTables.load(parts[0]);
- String[] metadataColumns = parts[1].split(",");
- return new SparkTestTable(table, metadataColumns, false);
- } else {
- TestTables.TestTable table = TestTables.load(ident.name());
- return new SparkTestTable(table, null, false);
+ TableIdentifier tableIdentifier = Spark3Util.identifierToTableIdentifier(ident);
+ Namespace namespace = tableIdentifier.namespace();
+
+ TestTables.TestTable table = TestTables.load(tableIdentifier.toString());
+ if (table == null && namespace.equals(Namespace.of("default"))) {
+ table = TestTables.load(tableIdentifier.name());
}
+
+ return new SparkTable(table, false);
}
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index b29d281..848545c 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -117,8 +117,6 @@ public class TestSparkMetadataColumns extends SparkTestBase {
TestTables.clearTables();
}
- // TODO: remove testing workarounds once we compile against Spark 3.2
-
@Test
public void testSpecAndPartitionMetadataColumns() {
// TODO: support metadata structs in vectorized ORC reads
@@ -156,7 +154,7 @@ public class TestSparkMetadataColumns extends SparkTestBase {
row(3, row(null, 2))
);
assertEquals("Rows must match", expected,
- sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME));
+ sql("SELECT _spec_id, _partition FROM %s ORDER BY _spec_id", TABLE_NAME));
}
@Test
@@ -168,7 +166,7 @@ public class TestSparkMetadataColumns extends SparkTestBase {
AssertHelpers.assertThrows("Should fail to query the partition metadata column",
ValidationException.class, "Cannot build table partition type, unknown transforms",
- () -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME));
+ () -> sql("SELECT _partition FROM %s", TABLE_NAME));
}
private void createAndInitTable() throws IOException {