You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/12/01 08:26:44 UTC
[iceberg] branch master updated: Spark: Remove old connector APIs for row-level commands (#3635)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3797114 Spark: Remove old connector APIs for row-level commands (#3635)
3797114 is described below
commit 37971149f2ed589a6d5176e34ba8ecd8206e8cff
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Dec 1 10:26:28 2021 +0200
Spark: Remove old connector APIs for row-level commands (#3635)
---
.../iceberg/spark/source/SparkMergeBuilder.java | 110 ---------------------
.../iceberg/spark/source/SparkMergeScan.java | 14 +--
.../apache/iceberg/spark/source/SparkTable.java | 31 +-----
.../iceberg/catalog/ExtendedSupportsDelete.java | 43 --------
.../connector/iceberg/catalog/SupportsMerge.java | 41 --------
.../connector/iceberg/read/SupportsFileFilter.java | 36 -------
.../sql/connector/iceberg/write/MergeBuilder.java | 42 --------
7 files changed, 2 insertions(+), 315 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
deleted file mode 100644
index ade6c2f..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import java.util.Locale;
-import java.util.Map;
-import org.apache.iceberg.IsolationLevel;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
-import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-
-import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
-import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
-import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
-import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
-import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL_DEFAULT;
-
-class SparkMergeBuilder implements MergeBuilder {
-
- private final SparkSession spark;
- private final Table table;
- private final LogicalWriteInfo writeInfo;
- private final IsolationLevel isolationLevel;
-
- // lazy vars
- private ScanBuilder lazyScanBuilder;
- private Scan configuredScan;
- private WriteBuilder lazyWriteBuilder;
-
- SparkMergeBuilder(SparkSession spark, Table table, String operation, LogicalWriteInfo writeInfo) {
- this.spark = spark;
- this.table = table;
- this.writeInfo = writeInfo;
- this.isolationLevel = getIsolationLevel(table.properties(), operation);
- }
-
- private IsolationLevel getIsolationLevel(Map<String, String> props, String operation) {
- String isolationLevelAsString;
- if (operation.equalsIgnoreCase("delete")) {
- isolationLevelAsString = props.getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT);
- } else if (operation.equalsIgnoreCase("update")) {
- isolationLevelAsString = props.getOrDefault(UPDATE_ISOLATION_LEVEL, UPDATE_ISOLATION_LEVEL_DEFAULT);
- } else if (operation.equalsIgnoreCase("merge")) {
- isolationLevelAsString = props.getOrDefault(MERGE_ISOLATION_LEVEL, MERGE_ISOLATION_LEVEL_DEFAULT);
- } else {
- throw new IllegalArgumentException("Unsupported operation: " + operation);
- }
- return IsolationLevel.valueOf(isolationLevelAsString.toUpperCase(Locale.ROOT));
- }
-
- @Override
- public ScanBuilder asScanBuilder() {
- return scanBuilder();
- }
-
- private ScanBuilder scanBuilder() {
- if (lazyScanBuilder == null) {
- SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, writeInfo.options()) {
- @Override
- public Scan build() {
- Scan scan = super.buildMergeScan();
- SparkMergeBuilder.this.configuredScan = scan;
- return scan;
- }
- };
- // ignore residuals to ensure we read full files
- lazyScanBuilder = scanBuilder.ignoreResiduals();
- }
-
- return lazyScanBuilder;
- }
-
- @Override
- public WriteBuilder asWriteBuilder() {
- return writeBuilder();
- }
-
- private WriteBuilder writeBuilder() {
- if (lazyWriteBuilder == null) {
- Preconditions.checkState(configuredScan != null, "Write must be configured after scan");
- SparkWriteBuilder writeBuilder = new SparkWriteBuilder(spark, table, writeInfo);
- lazyWriteBuilder = writeBuilder.overwriteFiles(configuredScan, isolationLevel);
- }
-
- return lazyWriteBuilder;
- }
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
index ededb67..cf6a6e0 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
@@ -39,10 +38,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter;
import org.apache.spark.sql.connector.read.Statistics;
-class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
+class SparkMergeScan extends SparkBatchScan {
private final Table table;
private final boolean ignoreResiduals;
@@ -89,16 +87,6 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
return super.estimateStatistics();
}
- @Override
- public void filterFiles(Set<String> locations) {
- // invalidate cached tasks to trigger split planning again
- tasks = null;
- filteredLocations = locations;
- files = files().stream()
- .filter(file -> filteredLocations.contains(file.file().path().toString()))
- .collect(Collectors.toList());
- }
-
// should be accessible to the write
synchronized List<FileScanTask> files() {
if (files == null) {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d93ce27..70b1e5d 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -53,8 +53,6 @@ import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.connector.iceberg.catalog.SupportsMerge;
-import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
@@ -66,15 +64,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.TableProperties.DELETE_MODE;
-import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
-import static org.apache.iceberg.TableProperties.MERGE_MODE;
-import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPDATE_MODE;
-import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
-
public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
- SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge, SupportsMetadataColumns {
+ SupportsRead, SupportsWrite, SupportsDelete, SupportsMetadataColumns {
private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);
@@ -210,26 +201,6 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
}
@Override
- public MergeBuilder newMergeBuilder(String operation, LogicalWriteInfo info) {
- String mode = getRowLevelOperationMode(operation);
- ValidationException.check(mode.equals("copy-on-write"), "Unsupported mode for %s: %s", operation, mode);
- return new SparkMergeBuilder(sparkSession(), icebergTable, operation, info);
- }
-
- private String getRowLevelOperationMode(String operation) {
- Map<String, String> props = icebergTable.properties();
- if (operation.equalsIgnoreCase("delete")) {
- return props.getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT);
- } else if (operation.equalsIgnoreCase("update")) {
- return props.getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT);
- } else if (operation.equalsIgnoreCase("merge")) {
- return props.getOrDefault(MERGE_MODE, MERGE_MODE_DEFAULT);
- } else {
- throw new IllegalArgumentException("Unsupported operation: " + operation);
- }
- }
-
- @Override
public boolean canDeleteWhere(Filter[] filters) {
Expression deleteExpr = Expressions.alwaysTrue();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/ExtendedSupportsDelete.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/ExtendedSupportsDelete.java
deleted file mode 100644
index e59bfe2..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/ExtendedSupportsDelete.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.catalog;
-
-import org.apache.spark.sql.connector.catalog.SupportsDelete;
-import org.apache.spark.sql.sources.Filter;
-
-// this should be part of SupportsDelete when merged upstream
-public interface ExtendedSupportsDelete extends SupportsDelete {
- /**
- * Checks if it is possible to delete data from a data source table that matches filter expressions.
- * <p>
- * Rows should be deleted from the data source iff all of the filter expressions match. That is, the
- * expressions must be interpreted as a set of filters that are ANDed together.
- * <p>
- * Spark will call this method to check if the delete is possible without significant effort.
- * Otherwise, Spark will try to rewrite the delete operation if the data source table
- * supports row-level operations.
- *
- * @param filters filter expressions, used to select rows to delete when all expressions match
- * @return true if the delete operation can be performed
- */
- default boolean canDeleteWhere(Filter[] filters) {
- return true;
- }
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/SupportsMerge.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/SupportsMerge.java
deleted file mode 100644
index d36fe92..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/SupportsMerge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.catalog;
-
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-
-/**
- * A mix-in interface for Table to indicate that it supports row-level operations.
- * <p>
- * This adds {@link #newMergeBuilder(String, LogicalWriteInfo)} that is used to create a scan and
- * a write for a row-level operation.
- */
-public interface SupportsMerge extends Table {
- /**
- * Returns a {@link MergeBuilder} which can be used to create both a scan and a write for a row-level
- * operation. Spark will call this method to configure each data source row-level operation.
- *
- * @param info write info
- * @return a merge builder
- */
- MergeBuilder newMergeBuilder(String operation, LogicalWriteInfo info);
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/read/SupportsFileFilter.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/read/SupportsFileFilter.java
deleted file mode 100644
index 1660c2a..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/read/SupportsFileFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.read;
-
-import java.util.Set;
-import org.apache.spark.sql.connector.read.Scan;
-
-/**
- * A mix-in interface for Scan. Data sources can implement this interface if they support dynamic
- * file filters.
- */
-public interface SupportsFileFilter extends Scan {
- /**
- * Filters this scan to query only selected files.
- *
- * @param locations file locations
- */
- void filterFiles(Set<String> locations);
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/write/MergeBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/write/MergeBuilder.java
deleted file mode 100644
index 8b1224b..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/write/MergeBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.write;
-
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-
-/**
- * An interface for building a scan and a write for a row-level operation.
- */
-public interface MergeBuilder {
- /**
- * Creates a scan builder for a row-level operation.
- *
- * @return a scan builder
- */
- ScanBuilder asScanBuilder();
-
- /**
- * Creates a write builder for a row-level operation.
- *
- * @return a write builder
- */
- WriteBuilder asWriteBuilder();
-}