You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/21 18:51:39 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

aokolnychyi commented on code in PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#discussion_r902958198


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -19,346 +19,52 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An action that removes orphan metadata, data and delete files by listing a given location and comparing
- * the actual files in that location with content and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link Table#location()} and
- * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument.  This
- * skips the directory listing - any files in the dataset provided which are not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
  */
-public class BaseDeleteOrphanFilesSparkAction
-    extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
-  private final SerializableConfiguration hadoopConf;
-  private final int partitionDiscoveryParallelism;
-  private final Table table;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      table.io().deleteFile(file);
-    }
-  };
-
-  private String location = null;
-  private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
-  private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {

Review Comment:
   I initially simply renamed the old actions. It was a much smaller change but could break folks who did explicit casts in the past as these base action classes were kind of public. That's why I extended the renamed classes for compatibility with a deprecation warning.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -19,346 +19,52 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An action that removes orphan metadata, data and delete files by listing a given location and comparing
- * the actual files in that location with content and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link Table#location()} and
- * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument.  This
- * skips the directory listing - any files in the dataset provided which are not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
  */
-public class BaseDeleteOrphanFilesSparkAction
-    extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
-  private final SerializableConfiguration hadoopConf;
-  private final int partitionDiscoveryParallelism;
-  private final Table table;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      table.io().deleteFile(file);
-    }
-  };
-
-  private String location = null;
-  private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
-  private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {

Review Comment:
   Instead of exposing actual classes, we could add Spark-specific interfaces and expose them. However, I don't think it is a good idea. Methods we would add to those interfaces usually reflect internal implementation details that are not generic. In addition, we probably don’t want to complicate the actions API by adding another layer of interfaces. Exposing classes seems to be the most straightforward option but I'd prefer to do a rename in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org