You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/21 18:50:52 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #5108: Spark 3.2: Expose actual action classes in SparkActions

aokolnychyi opened a new pull request, #5108:
URL: https://github.com/apache/iceberg/pull/5108

   This PR exposes actual classes in `SparkActions` so that users don't have to do weird explicit casts to access methods that expose or accept `DataFrame`s.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi closed pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

Posted by GitBox <gi...@apache.org>.
aokolnychyi closed pull request #5108: Spark 3.2: Expose actual action classes in SparkActions
URL: https://github.com/apache/iceberg/pull/5108


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#discussion_r902958198


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -19,346 +19,52 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An action that removes orphan metadata, data and delete files by listing a given location and comparing
- * the actual files in that location with content and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link Table#location()} and
- * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument.  This
- * skips the directory listing - any files in the dataset provided which are not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
  */
-public class BaseDeleteOrphanFilesSparkAction
-    extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
-  private final SerializableConfiguration hadoopConf;
-  private final int partitionDiscoveryParallelism;
-  private final Table table;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      table.io().deleteFile(file);
-    }
-  };
-
-  private String location = null;
-  private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
-  private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {

Review Comment:
   I initially simply renamed the old actions. It was a much smaller change but could break folks who did explicit casts in the past as these base action classes were kind of public. That's why I extended the renamed classes for compatibility with a deprecation warning.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java:
##########
@@ -19,346 +19,52 @@
 
 package org.apache.iceberg.spark.actions;
 
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult;
-import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.hadoop.HiddenPathFilter;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.JobGroupInfo;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.expressions.UserDefinedFunction;
-import org.apache.spark.sql.functions;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.util.SerializableConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.GC_ENABLED;
-import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
 
 /**
- * An action that removes orphan metadata, data and delete files by listing a given location and comparing
- * the actual files in that location with content and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
- * <p>
- * By default, this action cleans up the table location returned by {@link Table#location()} and
- * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
- * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
- * For example, someone might point this action to the data folder to clean up only orphan data files.
- * <p>
- * Configure an alternative delete method using {@link #deleteWith(Consumer)}.
- * <p>
- * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument.  This
- * skips the directory listing - any files in the dataset provided which are not found in table metadata will
- * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
- * <p>
- * <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
- * the state of the table if another operation is writing at the same time.
+ * An action to delete orphan files.
+ *
+ * @deprecated since 0.14.0, will be removed in 1.0.0;
+ *             use {@link SparkActions} and {@link DeleteOrphanFilesSparkAction} instead.
  */
-public class BaseDeleteOrphanFilesSparkAction
-    extends BaseSparkAction<DeleteOrphanFiles, DeleteOrphanFiles.Result> implements DeleteOrphanFiles {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BaseDeleteOrphanFilesSparkAction.class);
-  private static final UserDefinedFunction filenameUDF = functions.udf((String path) -> {
-    int lastIndex = path.lastIndexOf(File.separator);
-    if (lastIndex == -1) {
-      return path;
-    } else {
-      return path.substring(lastIndex + 1);
-    }
-  }, DataTypes.StringType);
-
-  private final SerializableConfiguration hadoopConf;
-  private final int partitionDiscoveryParallelism;
-  private final Table table;
-  private final Consumer<String> defaultDelete = new Consumer<String>() {
-    @Override
-    public void accept(String file) {
-      table.io().deleteFile(file);
-    }
-  };
-
-  private String location = null;
-  private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
-  private Dataset<Row> compareToFileList;
-  private Consumer<String> deleteFunc = defaultDelete;
-  private ExecutorService deleteExecutorService = null;
+@Deprecated
+public class BaseDeleteOrphanFilesSparkAction extends DeleteOrphanFilesSparkAction {

Review Comment:
   Instead of exposing actual classes, we could add Spark-specific interfaces and expose them. However, I don't think it is a good idea. Methods we would add to those interfaces usually reflect internal implementation details that are not generic. In addition, we probably don’t want to complicate the actions API by adding another layer of interfaces. Exposing classes seems to be the most straightforward option but I'd prefer to do a rename in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#issuecomment-1162191395

   cc @kbendick @rdblue @flyrain @karuppayya @szehon-ho @RussellSpitzer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#issuecomment-1169398609

   @aokolnychyi, should this go into the 0.14.0 release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#issuecomment-1165717805

   @aokolnychyi can you rebase?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5108: Spark 3.2: Expose actual action classes in SparkActions

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5108:
URL: https://github.com/apache/iceberg/pull/5108#issuecomment-1182192138

   Closing in favor of #5257.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org