You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/12/01 08:26:44 UTC

[iceberg] branch master updated: Spark: Remove old connector APIs for row-level commands (#3635)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3797114  Spark: Remove old connector APIs for row-level commands (#3635)
3797114 is described below

commit 37971149f2ed589a6d5176e34ba8ecd8206e8cff
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Dec 1 10:26:28 2021 +0200

    Spark: Remove old connector APIs for row-level commands (#3635)
---
 .../iceberg/spark/source/SparkMergeBuilder.java    | 110 ---------------------
 .../iceberg/spark/source/SparkMergeScan.java       |  14 +--
 .../apache/iceberg/spark/source/SparkTable.java    |  31 +-----
 .../iceberg/catalog/ExtendedSupportsDelete.java    |  43 --------
 .../connector/iceberg/catalog/SupportsMerge.java   |  41 --------
 .../connector/iceberg/read/SupportsFileFilter.java |  36 -------
 .../sql/connector/iceberg/write/MergeBuilder.java  |  42 --------
 7 files changed, 2 insertions(+), 315 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
deleted file mode 100644
index ade6c2f..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeBuilder.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import java.util.Locale;
-import java.util.Map;
-import org.apache.iceberg.IsolationLevel;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
-import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-
-import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
-import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
-import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
-import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
-import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL_DEFAULT;
-
-class SparkMergeBuilder implements MergeBuilder {
-
-  private final SparkSession spark;
-  private final Table table;
-  private final LogicalWriteInfo writeInfo;
-  private final IsolationLevel isolationLevel;
-
-  // lazy vars
-  private ScanBuilder lazyScanBuilder;
-  private Scan configuredScan;
-  private WriteBuilder lazyWriteBuilder;
-
-  SparkMergeBuilder(SparkSession spark, Table table, String operation, LogicalWriteInfo writeInfo) {
-    this.spark = spark;
-    this.table = table;
-    this.writeInfo = writeInfo;
-    this.isolationLevel = getIsolationLevel(table.properties(), operation);
-  }
-
-  private IsolationLevel getIsolationLevel(Map<String, String> props, String operation) {
-    String isolationLevelAsString;
-    if (operation.equalsIgnoreCase("delete")) {
-      isolationLevelAsString = props.getOrDefault(DELETE_ISOLATION_LEVEL, DELETE_ISOLATION_LEVEL_DEFAULT);
-    } else if (operation.equalsIgnoreCase("update")) {
-      isolationLevelAsString = props.getOrDefault(UPDATE_ISOLATION_LEVEL, UPDATE_ISOLATION_LEVEL_DEFAULT);
-    } else if (operation.equalsIgnoreCase("merge")) {
-      isolationLevelAsString = props.getOrDefault(MERGE_ISOLATION_LEVEL, MERGE_ISOLATION_LEVEL_DEFAULT);
-    } else {
-      throw new IllegalArgumentException("Unsupported operation: " + operation);
-    }
-    return IsolationLevel.valueOf(isolationLevelAsString.toUpperCase(Locale.ROOT));
-  }
-
-  @Override
-  public ScanBuilder asScanBuilder() {
-    return scanBuilder();
-  }
-
-  private ScanBuilder scanBuilder() {
-    if (lazyScanBuilder == null) {
-      SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, writeInfo.options()) {
-        @Override
-        public Scan build() {
-          Scan scan = super.buildMergeScan();
-          SparkMergeBuilder.this.configuredScan = scan;
-          return scan;
-        }
-      };
-      // ignore residuals to ensure we read full files
-      lazyScanBuilder = scanBuilder.ignoreResiduals();
-    }
-
-    return lazyScanBuilder;
-  }
-
-  @Override
-  public WriteBuilder asWriteBuilder() {
-    return writeBuilder();
-  }
-
-  private WriteBuilder writeBuilder() {
-    if (lazyWriteBuilder == null) {
-      Preconditions.checkState(configuredScan != null, "Write must be configured after scan");
-      SparkWriteBuilder writeBuilder = new SparkWriteBuilder(spark, table, writeInfo);
-      lazyWriteBuilder = writeBuilder.overwriteFiles(configuredScan, isolationLevel);
-    }
-
-    return lazyWriteBuilder;
-  }
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
index ededb67..cf6a6e0 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
@@ -39,10 +38,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter;
 import org.apache.spark.sql.connector.read.Statistics;
 
-class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
+class SparkMergeScan extends SparkBatchScan {
 
   private final Table table;
   private final boolean ignoreResiduals;
@@ -89,16 +87,6 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
     return super.estimateStatistics();
   }
 
-  @Override
-  public void filterFiles(Set<String> locations) {
-    // invalidate cached tasks to trigger split planning again
-    tasks = null;
-    filteredLocations = locations;
-    files = files().stream()
-        .filter(file -> filteredLocations.contains(file.file().path().toString()))
-        .collect(Collectors.toList());
-  }
-
   // should be accessible to the write
   synchronized List<FileScanTask> files() {
     if (files == null) {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d93ce27..70b1e5d 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -53,8 +53,6 @@ import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
 import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.connector.iceberg.catalog.SupportsMerge;
-import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.write.LogicalWriteInfo;
 import org.apache.spark.sql.connector.write.WriteBuilder;
@@ -66,15 +64,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.DELETE_MODE;
-import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
-import static org.apache.iceberg.TableProperties.MERGE_MODE;
-import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
-import static org.apache.iceberg.TableProperties.UPDATE_MODE;
-import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
-
 public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
-    SupportsRead, SupportsWrite, SupportsDelete, SupportsMerge, SupportsMetadataColumns {
+    SupportsRead, SupportsWrite, SupportsDelete, SupportsMetadataColumns {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);
 
@@ -210,26 +201,6 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
   }
 
   @Override
-  public MergeBuilder newMergeBuilder(String operation, LogicalWriteInfo info) {
-    String mode = getRowLevelOperationMode(operation);
-    ValidationException.check(mode.equals("copy-on-write"), "Unsupported mode for %s: %s", operation, mode);
-    return new SparkMergeBuilder(sparkSession(), icebergTable, operation, info);
-  }
-
-  private String getRowLevelOperationMode(String operation) {
-    Map<String, String> props = icebergTable.properties();
-    if (operation.equalsIgnoreCase("delete")) {
-      return props.getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT);
-    } else if (operation.equalsIgnoreCase("update")) {
-      return props.getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT);
-    } else if (operation.equalsIgnoreCase("merge")) {
-      return props.getOrDefault(MERGE_MODE, MERGE_MODE_DEFAULT);
-    } else {
-      throw new IllegalArgumentException("Unsupported operation: " + operation);
-    }
-  }
-
-  @Override
   public boolean canDeleteWhere(Filter[] filters) {
     Expression deleteExpr = Expressions.alwaysTrue();
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/ExtendedSupportsDelete.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/ExtendedSupportsDelete.java
deleted file mode 100644
index e59bfe2..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/ExtendedSupportsDelete.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.catalog;
-
-import org.apache.spark.sql.connector.catalog.SupportsDelete;
-import org.apache.spark.sql.sources.Filter;
-
-// this should be part of SupportsDelete when merged upstream
-public interface ExtendedSupportsDelete extends SupportsDelete {
-  /**
-   * Checks if it is possible to delete data from a data source table that matches filter expressions.
-   * <p>
-   * Rows should be deleted from the data source iff all of the filter expressions match. That is, the
-   * expressions must be interpreted as a set of filters that are ANDed together.
-   * <p>
-   * Spark will call this method to check if the delete is possible without significant effort.
-   * Otherwise, Spark will try to rewrite the delete operation if the data source table
-   * supports row-level operations.
-   *
-   * @param filters filter expressions, used to select rows to delete when all expressions match
-   * @return true if the delete operation can be performed
-   */
-  default boolean canDeleteWhere(Filter[] filters) {
-    return true;
-  }
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/SupportsMerge.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/SupportsMerge.java
deleted file mode 100644
index d36fe92..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/catalog/SupportsMerge.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.catalog;
-
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.iceberg.write.MergeBuilder;
-import org.apache.spark.sql.connector.write.LogicalWriteInfo;
-
-/**
- * A mix-in interface for Table to indicate that it supports row-level operations.
- * <p>
- * This adds {@link #newMergeBuilder(String, LogicalWriteInfo)} that is used to create a scan and
- * a write for a row-level operation.
- */
-public interface SupportsMerge extends Table {
-  /**
-   * Returns a {@link MergeBuilder} which can be used to create both a scan and a write for a row-level
-   * operation. Spark will call this method to configure each data source row-level operation.
-   *
-   * @param info write info
-   * @return a merge builder
-   */
-  MergeBuilder newMergeBuilder(String operation, LogicalWriteInfo info);
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/read/SupportsFileFilter.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/read/SupportsFileFilter.java
deleted file mode 100644
index 1660c2a..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/read/SupportsFileFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.read;
-
-import java.util.Set;
-import org.apache.spark.sql.connector.read.Scan;
-
-/**
- * A mix-in interface for Scan. Data sources can implement this interface if they support dynamic
- * file filters.
- */
-public interface SupportsFileFilter extends Scan {
-  /**
-   * Filters this scan to query only selected files.
-   *
-   * @param locations file locations
-   */
-  void filterFiles(Set<String> locations);
-}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/write/MergeBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/write/MergeBuilder.java
deleted file mode 100644
index 8b1224b..0000000
--- a/spark/v3.2/spark/src/main/java/org/apache/spark/sql/connector/iceberg/write/MergeBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.connector.iceberg.write;
-
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-
-/**
- * An interface for building a scan and a write for a row-level operation.
- */
-public interface MergeBuilder {
-  /**
-   * Creates a scan builder for a row-level operation.
-   *
-   * @return a scan builder
-   */
-  ScanBuilder asScanBuilder();
-
-  /**
-   * Creates a write builder for a row-level operation.
-   *
-   * @return a write builder
-   */
-  WriteBuilder asWriteBuilder();
-}