You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/18 09:29:25 UTC

[GitHub] [iceberg] zhangjun0x01 opened a new pull request #1624: extract some common functions to iceberg-core

zhangjun0x01 opened a new pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624


   extract some common functions to iceberg-core,so that we can do RewriteDataFilesAction for flink and spark,avoid duplication of code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r513164959



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,17 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());

Review comment:
       This fileIO() method should be a setter for field fileIO. This method only needs to be called once when BaseRewriteDataFilesAction is initialized. Other places need to refer to fileIO and only need to call its getter method, 
   
   If we rename fileIO() to setFileIO(), it might be better understood, but I don’t know if this conforms to iceberg's specification.
   
   ```
     // BaseRewriteDataFilesAction.java 
     protected FileIO fileIO() {
       return fileIO;
     }
   
     protected abstract FileIO setFileIO();
   ```
   
   ```
     // RewriteDataFilesAction.java 
     @Override
     protected FileIO setFileIO() {
       return SparkUtil.serializableFileIO(table());
     }
   ```
   
   so I think the method will not be called multiple times. What do you think ?@openinx




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] simonsssu commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
simonsssu commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714177418


   @zhangjun0x01 I think it's a little tricky. What about your table contains 3 DataFiles and each of them was 90MB ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r509130060



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -19,263 +19,64 @@
 
 package org.apache.iceberg.actions;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.source.RowDataRewriter;
-import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.StructLikeWrapper;
-import org.apache.iceberg.util.TableScanUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class RewriteDataFilesAction
-    extends BaseSnapshotUpdateAction<RewriteDataFilesAction, RewriteDataFilesActionResult> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesAction.class);
+    extends RewriteDataFilesActionBase<RewriteDataFilesAction, RewriteDataFilesActionResult> {
 
   private final JavaSparkContext sparkContext;
-  private final Table table;
-  private final FileIO fileIO;
-  private final EncryptionManager encryptionManager;
-  private final boolean caseSensitive;
-  private long targetSizeInBytes;
-  private int splitLookback;
-  private long splitOpenFileCost;
-
-  private PartitionSpec spec = null;
-  private Expression filter;
 
   RewriteDataFilesAction(SparkSession spark, Table table) {
+    super(table);
     this.sparkContext = new JavaSparkContext(spark.sparkContext());
-    this.table = table;
-    this.spec = table.spec();
-    this.filter = Expressions.alwaysTrue();
-    this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
-
-    long splitSize = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.SPLIT_SIZE,
-        TableProperties.SPLIT_SIZE_DEFAULT);
-    long targetFileSize = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
-
-    this.splitLookback = PropertyUtil.propertyAsInt(
-        table.properties(),
-        TableProperties.SPLIT_LOOKBACK,
-        TableProperties.SPLIT_LOOKBACK_DEFAULT);
-    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.SPLIT_OPEN_FILE_COST,
-        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
-
-    this.fileIO = SparkUtil.serializableFileIO(table);
-    this.encryptionManager = table.encryption();
+    this.setCaseSensitive(Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false")));
   }
 
   @Override
   protected RewriteDataFilesAction self() {
     return this;
   }
 
-  @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
-  }
-
   @Override
   public RewriteDataFilesActionResult execute() {
-    CloseableIterable<FileScanTask> fileScanTasks = null;
-    try {
-      fileScanTasks = table.newScan()
-          .caseSensitive(caseSensitive)
-          .ignoreResiduals()
-          .filter(filter)
-          .planFiles();
-    } finally {
-      try {
-        if (fileScanTasks != null) {
-          fileScanTasks.close();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterable", ioe);
-      }
-    }
-
-    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
-    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
-        .filter(kv -> kv.getValue().size() > 1)
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = getFilteredGroupedTasks();
 
     // Nothing to rewrite if there's only one DataFile in each partition.
     if (filteredGroupedTasks.isEmpty()) {
       return RewriteDataFilesActionResult.empty();
     }
 
     // Split and combine tasks under each partition
-    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
-        .map(scanTasks -> {
-          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
-              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
-          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
-        })
-        .flatMap(Streams::stream)
-        .collect(Collectors.toList());
+    List<CombinedScanTask> combinedScanTasks = getCombinedScanTasks(filteredGroupedTasks);
 
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
 
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
+    Broadcast<FileIO> io = sparkContext.broadcast(SparkUtil.serializableFileIO(this.table()));
+    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(this.table().encryption());
 
-    RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
+    RowDataRewriter rowDataRewriter =
+        new RowDataRewriter(this.table(), this.table().spec(), isCaseSensitive(), io, encryption);
 
     List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);

Review comment:
       thanks for your suggestion,I extract execute method to RewriteDataFilesActionBase, and add a abstract method rewriteDataForTasks in RewriteDataFilesActionBase.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r512751745



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,17 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());

Review comment:
       should we make this lazy? 
   ```java
   private FileIO lazyFileIO;
   protected FileIO fileIO { 
      if (lazyFileIO == null) { 
        lazyFileIO =...
      }
     return lazyFileIo
   }
   ```
   }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r512359342



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private boolean caseSensitive;
+  private PartitionSpec spec;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  public BaseRewriteDataFilesAction(Table table) {
+    this.table = table;
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    this.fileIO = fileIO(table);
+    this.encryptionManager = table.encryption();
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  protected FileIO fileIO() {
+    return fileIO;
+  }
+
+  protected void caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the target rewrite data file size in bytes
+   *
+   * @param targetSize size in bytes of rewrite data file
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long targetSize) {
+    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
+        targetSize);
+    this.targetSizeInBytes = targetSize;
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  @Override
+  public RewriteDataFilesActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .filter(filter)
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
+        .filter(kv -> kv.getValue().size() > 1)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    // Nothing to rewrite if there's only one DataFile in each partition.
+    if (filteredGroupedTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+    // Split and combine tasks under each partition
+    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
+        .map(scanTasks -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
+          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
+        })
+        .flatMap(Streams::stream)
+        .collect(Collectors.toList());
+
+    List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
+    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
+        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+        .collect(Collectors.toList());
+    replaceDataFiles(currentDataFiles, addedDataFiles);
+
+    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
+  }
+
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+    try (CloseableIterator<FileScanTask> iterator = tasksIter) {
+      iterator.forEachRemaining(task -> {
+        StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
+        tasksGroupedByPartition.put(structLike, task);
+      });
+    } catch (IOException e) {
+      LOG.warn("Failed to close task iterator", e);
+    }
+    return tasksGroupedByPartition.asMap();
+  }
+
+  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      commit(rewriteFiles);
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+      throw e;
+    }
+  }
+
+  protected abstract FileIO fileIO(Table icebergTable);

Review comment:
       ok,I deleted the icebergTable argument 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r510582159



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private PartitionSpec spec;
+  private boolean caseSensitive;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  public BaseRewriteDataFilesAction(Table table) {
+    this.table = table;
+    this.fileIO = table.io();
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+  }
+
+  protected void caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+

Review comment:
       nit:  here we don't need an empty line.

##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private boolean caseSensitive;
+  private PartitionSpec spec;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  public BaseRewriteDataFilesAction(Table table) {
+    this.table = table;
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    this.fileIO = fileIO(table);
+    this.encryptionManager = table.encryption();
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  protected FileIO fileIO() {
+    return fileIO;
+  }
+
+  protected void caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the target rewrite data file size in bytes
+   *
+   * @param targetSize size in bytes of rewrite data file
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long targetSize) {
+    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
+        targetSize);
+    this.targetSizeInBytes = targetSize;
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+

Review comment:
       nit:  useless empty line ? 

##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private boolean caseSensitive;
+  private PartitionSpec spec;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  public BaseRewriteDataFilesAction(Table table) {
+    this.table = table;
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    this.fileIO = fileIO(table);
+    this.encryptionManager = table.encryption();
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  protected FileIO fileIO() {
+    return fileIO;
+  }
+
+  protected void caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the target rewrite data file size in bytes
+   *
+   * @param targetSize size in bytes of rewrite data file
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long targetSize) {
+    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
+        targetSize);
+    this.targetSizeInBytes = targetSize;
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  @Override
+  public RewriteDataFilesActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .filter(filter)
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
+        .filter(kv -> kv.getValue().size() > 1)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    // Nothing to rewrite if there's only one DataFile in each partition.
+    if (filteredGroupedTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+    // Split and combine tasks under each partition
+    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
+        .map(scanTasks -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
+          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
+        })
+        .flatMap(Streams::stream)
+        .collect(Collectors.toList());
+
+    List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
+    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
+        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
+        .collect(Collectors.toList());
+    replaceDataFiles(currentDataFiles, addedDataFiles);
+
+    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
+  }
+
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+    try (CloseableIterator<FileScanTask> iterator = tasksIter) {
+      iterator.forEachRemaining(task -> {
+        StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
+        tasksGroupedByPartition.put(structLike, task);
+      });
+    } catch (IOException e) {
+      LOG.warn("Failed to close task iterator", e);
+    }
+    return tasksGroupedByPartition.asMap();
+  }
+
+  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      commit(rewriteFiles);
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+      throw e;
+    }
+  }
+
+  protected abstract FileIO fileIO(Table icebergTable);

Review comment:
       We've already had a `table()` method before, do we need the extra `icebergTable` argument ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508190713



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive;
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    fileIO = table.io();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.encryptionManager = table.encryption();
+    spec = table.spec();

Review comment:
       ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-716916998


   Sorry I haven't had a chance to look at this yet. I'll look tomorrow morning


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r512729015



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseAction<R> implements Action<R> {
+
+  protected abstract Table table();
+
+  protected String metadataTableName(MetadataTableType type) {
+    return metadataTableName(table().name(), type);
+  }
+
+  protected String metadataTableName(String tableName, MetadataTableType type) {
+    if (tableName.contains("/")) {
+      return tableName + "#" + type;
+    } else if (tableName.startsWith("hadoop.")) {
+      // for HadoopCatalog tables, use the table location to load the metadata table
+      // because IcebergCatalog uses HiveCatalog when the table is identified by name
+      return table().location() + "#" + type;
+    } else if (tableName.startsWith("hive.")) {
+      // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
+      return tableName.replaceFirst("hive\\.", "") + "." + type;

Review comment:
       This I think may end up having to be split up for Spark and Flink once we start having to deal with multiple catalogs in Spark. I think this extraction is still ok because we can extend and override this method. But I think this method is going to end up being implementation specific based on what the parsers can handle.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] simonsssu commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
simonsssu commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714262488


   @zhangjun0x01 Hi ZhangJun, Thanks for your explanation. We can not make sure that the second streaming task will generated a lot of small files again. The purpose you want to add this is you consider there will have 3 DataFiles which are all 120MB then they will regenerate, If I have the first Datafile 120MB, and then I generated a 5MB datafile in the second time , it will also trigger to merge to a 125MB.  Is it the same as what you have done now?  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-712249285


   If we are moving the "Action" class perhaps it belongs in the API module rather than core (although BaseAction probably still belongs in core?)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r513132466



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,17 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());

Review comment:
       we could make it lazy, I looked up the code which uses SparkUtil#serializableFileIO method, they are all directly call, so I think make it lazy may not make much sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r513170173



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,17 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());

Review comment:
       OK, sound like we've a local field to initialize the fileIO, this method will only call once now. I'm OK with it then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508179039



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseAction<R> implements Action<R> {
+
+  protected abstract Table table();
+
+  protected String metadataTableName(MetadataTableType type) {
+    return metadataTableName(table().name(), type);
+  }
+
+  protected String metadataTableName(String tableName, MetadataTableType type) {
+    if (tableName.contains("/")) {
+      return tableName + "#" + type;
+    } else if (tableName.startsWith("hadoop.")) {
+      // for HadoopCatalog tables, use the table location to load the metadata table
+      // because IcebergCatalog uses HiveCatalog when the table is identified by name
+      return table().location() + "#" + type;
+    } else if (tableName.startsWith("hive.")) {
+      // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
+      return tableName.replaceFirst("hive\\.", "") + "." + type;

Review comment:
       Do we need this for flink ?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714842987


   @openinx thanks for your review,I have updated the code according to your suggestion


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508190606



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive;
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    fileIO = table.io();

Review comment:
       In iceberg, we usually use `this.fileIO=table.io` to assign the argument to an internal member in constructor. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-717685665


   I think this pathc is ready to go, so I merged this. Thanks all for reviewing. And thanks @zhangjun0x01 for contribution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] simonsssu removed a comment on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
simonsssu removed a comment on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714177418


   @zhangjun0x01 I think it's a little tricky. What about your table contains 3 DataFiles and each of them was 90MB ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-712252740


   I think this makes sense to me, one question though is do we leave "Actions" in the Spark Module? I wouldn't want to break the api there if possible.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r512751745



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,17 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());

Review comment:
       should we make this lazy? 
   
   private FileIO lazyFileIO;
   protected FileIO fileIO { 
      if (lazyFileIO == null) { 
        lazyFileIO =...
      }
     return lazyFileIo
   }
   }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r509982809



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT>

Review comment:
       I use checkstyle to check, the number of characters in this line is 127, which is greater than the max of 120




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r509971821



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private PartitionSpec spec;
+  private boolean caseSensitive;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private long rewriteScanLimit;
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    this.fileIO = table.io();
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.rewriteScanLimit = PropertyUtil.propertyAsLong(

Review comment:
        I will rollback it, and I will open a new PR later if necessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 edited a comment on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 edited a comment on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714168491


   hi,@openinx ,I add a table properties'REWRITE_SCAN_LIMIT' ,and set the default value is 100M, in RewriteDataFilesAction, if the file size is greater  than this value, it will not be scanned, that is, it will not be compressed.
   
   Because the default value of the targetSizeInBytes is 128M.For example, my iceberg table has 3 datafiles, each of them is 120M. During the execution of Rewrite, the program will still scan these three files and then regenerate the 3 new files which file size  is the same as the original data. If we periodically perform a Rewrite operation on an iceberg table that is being written in real time, these 120M files will be compressed  repeatedly. I think this is unreasonable.
   
   In addition, in the process of combine FileScanTask into CombinedScanTask, it is difficult to ensure that the size of scanned data is exactly targetSizeInBytes (default: 128M).
   
   org.apache.iceberg.util.BinPacking.Bin#canAdd method.
   ```
       boolean canAdd(long weight) {
         return binWeight + weight <= targetWeight;
       }
   ```
   
   In most cases, the actual scan size will be less than  targetSizeInBytes, so I set this limit to the default 100M,not 128M


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-712726668


   Thank you very much for the review. @openinx ,@simonsssu ,@RussellSpitzer  I have not submitted a patch in the iceberg community before, so some rules are not well understood , make some low-level mistakes. I will pay attention to it later,I update the code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508237727



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -112,170 +54,35 @@ protected RewriteDataFilesAction self() {
 
   @Override
   protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
+    return getTable();
   }
 
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
-  }
 
   @Override
   public RewriteDataFilesActionResult execute() {
-    CloseableIterable<FileScanTask> fileScanTasks = null;
-    try {
-      fileScanTasks = table.newScan()
-          .caseSensitive(caseSensitive)
-          .ignoreResiduals()
-          .filter(filter)
-          .planFiles();
-    } finally {
-      try {
-        if (fileScanTasks != null) {
-          fileScanTasks.close();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterable", ioe);
-      }
-    }
-
-    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
-    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
-        .filter(kv -> kv.getValue().size() > 1)
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = getFilteredGroupedTasks();
 
     // Nothing to rewrite if there's only one DataFile in each partition.
     if (filteredGroupedTasks.isEmpty()) {
       return RewriteDataFilesActionResult.empty();
     }
 
     // Split and combine tasks under each partition
-    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
-        .map(scanTasks -> {
-          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
-              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
-          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
-        })
-        .flatMap(Streams::stream)
-        .collect(Collectors.toList());
+    List<CombinedScanTask> combinedScanTasks = getCombinedScanTasks(filteredGroupedTasks);

Review comment:
       I did not extract into a method, because the following getCurrentDataFiles also used filteredGroupedTasks, if extracted into a method, then the groupTasksByPartition method will be executed twice




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r507596252



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {

Review comment:
       I don't think we did the thing as the commit log described exactly,  because what we really need it to abstract the following codes from spark module (that means the spark rewrite action should also use this common code),  rather than introducing a totally new `RewriteDataFilesActionBase` class and to be used for FLINK only.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508183953



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -112,170 +54,35 @@ protected RewriteDataFilesAction self() {
 
   @Override
   protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
+    return getTable();

Review comment:
       We usually don't use `getTable` in iceberg code because the prefix `get` does not have much meaning. It's good to use `protected Table table()` directly here. 
   
   Besides, I think we could just move the `protected Table table()` to the parent class, don't have to override it here then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-712627740


   > I think this makes sense to me, one question though is do we leave "Actions" in the Spark Module? I wouldn't want to break the api there if possible.
   
   The Actions  class involve some non-common functions, so I did not extract Actions. For example, spark Actions need SparkSession, and flink Actions need ExecutionEnvironment.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508186353



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -112,170 +54,35 @@ protected RewriteDataFilesAction self() {
 
   @Override
   protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
+    return getTable();
   }
 
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
-  }
 
   @Override
   public RewriteDataFilesActionResult execute() {
-    CloseableIterable<FileScanTask> fileScanTasks = null;
-    try {
-      fileScanTasks = table.newScan()
-          .caseSensitive(caseSensitive)
-          .ignoreResiduals()
-          .filter(filter)
-          .planFiles();
-    } finally {
-      try {
-        if (fileScanTasks != null) {
-          fileScanTasks.close();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterable", ioe);
-      }
-    }
-
-    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
-    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
-        .filter(kv -> kv.getValue().size() > 1)
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = getFilteredGroupedTasks();
 
     // Nothing to rewrite if there's only one DataFile in each partition.
     if (filteredGroupedTasks.isEmpty()) {
       return RewriteDataFilesActionResult.empty();
     }
 
     // Split and combine tasks under each partition
-    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
-        .map(scanTasks -> {
-          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
-              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
-          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
-        })
-        .flatMap(Streams::stream)
-        .collect(Collectors.toList());
+    List<CombinedScanTask> combinedScanTasks = getCombinedScanTasks(filteredGroupedTasks);

Review comment:
       I'd prefer to intergate all the line63~line71 to a protected method named `planTasks` : 
   
   ```java
     /**
      * Plan the {@link CombinedScanTask tasks} for this scan.
      * <p>
      * Tasks created by this method may read partial input files, multiple input files, or both.
      *
      * @return an Iterable of tasks for this scan
      */
     protected CloseableIterable<CombinedScanTask> planTasks(); 
   ```
   
   That's similar to the `planTasks` method in TableScan.java. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r512747994



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private boolean caseSensitive;
+  private PartitionSpec spec;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  public BaseRewriteDataFilesAction(Table table) {
+    this.table = table;
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  protected void caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {

Review comment:
       I may be getting the pattern wrong here, but I know in most instances we use the method without a verb as the getter. So maybe it makes sense to have the setter be "setCaseSensative" and also have it return "this" for method chaining.
   
   Then have the getter be just "caseSensitive()"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r513130091



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseRewriteDataFilesAction<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+  private boolean caseSensitive;
+  private PartitionSpec spec;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+
+  public BaseRewriteDataFilesAction(Table table) {
+    this.table = table;
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    this.fileIO = fileIO();
+    this.encryptionManager = table.encryption();
+  }
+
+  @Override
+  protected Table table() {
+    return table;
+  }
+
+  protected EncryptionManager encryptionManager() {
+    return encryptionManager;
+  }
+
+  protected void caseSensitive(boolean newCaseSensitive) {
+    this.caseSensitive = newCaseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {

Review comment:
       I added a method: caseSensitive(boolean newCaseSensitive) for method chaining.and add the caseSensitive() for getter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-712765921


   ok, I will write a document later


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508972822



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseAction<R> implements Action<R> {
+
+  protected abstract Table table();
+
+  protected String metadataTableName(MetadataTableType type) {
+    return metadataTableName(table().name(), type);
+  }
+
+  protected String metadataTableName(String tableName, MetadataTableType type) {
+    if (tableName.contains("/")) {
+      return tableName + "#" + type;
+    } else if (tableName.startsWith("hadoop.")) {
+      // for HadoopCatalog tables, use the table location to load the metadata table
+      // because IcebergCatalog uses HiveCatalog when the table is identified by name
+      return table().location() + "#" + type;
+    } else if (tableName.startsWith("hive.")) {
+      // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
+      return tableName.replaceFirst("hive\\.", "") + "." + type;

Review comment:
       Yeah,  make sense to move them here for `RewriteManifestsAction` and `ExpireSnapshotsAction`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] simonsssu commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
simonsssu commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714184907


   @zhangjun0x01 I think your point is only scan the file which less then 100MB, I think it's a little tricky, if your table has 3 files and each of them was 99MB, they will also be put into FileScanTask right ? I think it will also regenerate 3 datafiles。


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508971602



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -19,263 +19,64 @@
 
 package org.apache.iceberg.actions;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.spark.source.RowDataRewriter;
-import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.StructLikeWrapper;
-import org.apache.iceberg.util.TableScanUtil;
-import org.apache.iceberg.util.Tasks;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class RewriteDataFilesAction
-    extends BaseSnapshotUpdateAction<RewriteDataFilesAction, RewriteDataFilesActionResult> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesAction.class);
+    extends RewriteDataFilesActionBase<RewriteDataFilesAction, RewriteDataFilesActionResult> {
 
   private final JavaSparkContext sparkContext;
-  private final Table table;
-  private final FileIO fileIO;
-  private final EncryptionManager encryptionManager;
-  private final boolean caseSensitive;
-  private long targetSizeInBytes;
-  private int splitLookback;
-  private long splitOpenFileCost;
-
-  private PartitionSpec spec = null;
-  private Expression filter;
 
   RewriteDataFilesAction(SparkSession spark, Table table) {
+    super(table);
     this.sparkContext = new JavaSparkContext(spark.sparkContext());
-    this.table = table;
-    this.spec = table.spec();
-    this.filter = Expressions.alwaysTrue();
-    this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
-
-    long splitSize = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.SPLIT_SIZE,
-        TableProperties.SPLIT_SIZE_DEFAULT);
-    long targetFileSize = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
-
-    this.splitLookback = PropertyUtil.propertyAsInt(
-        table.properties(),
-        TableProperties.SPLIT_LOOKBACK,
-        TableProperties.SPLIT_LOOKBACK_DEFAULT);
-    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
-        table.properties(),
-        TableProperties.SPLIT_OPEN_FILE_COST,
-        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
-
-    this.fileIO = SparkUtil.serializableFileIO(table);
-    this.encryptionManager = table.encryption();
+    this.setCaseSensitive(Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false")));
   }
 
   @Override
   protected RewriteDataFilesAction self() {
     return this;
   }
 
-  @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
-  }
-
   @Override
   public RewriteDataFilesActionResult execute() {
-    CloseableIterable<FileScanTask> fileScanTasks = null;
-    try {
-      fileScanTasks = table.newScan()
-          .caseSensitive(caseSensitive)
-          .ignoreResiduals()
-          .filter(filter)
-          .planFiles();
-    } finally {
-      try {
-        if (fileScanTasks != null) {
-          fileScanTasks.close();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterable", ioe);
-      }
-    }
-
-    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
-    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
-        .filter(kv -> kv.getValue().size() > 1)
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = getFilteredGroupedTasks();
 
     // Nothing to rewrite if there's only one DataFile in each partition.
     if (filteredGroupedTasks.isEmpty()) {
       return RewriteDataFilesActionResult.empty();
     }
 
     // Split and combine tasks under each partition
-    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
-        .map(scanTasks -> {
-          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
-              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
-          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
-        })
-        .flatMap(Streams::stream)
-        .collect(Collectors.toList());
+    List<CombinedScanTask> combinedScanTasks = getCombinedScanTasks(filteredGroupedTasks);
 
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
 
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
+    Broadcast<FileIO> io = sparkContext.broadcast(SparkUtil.serializableFileIO(this.table()));
+    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(this.table().encryption());
 
-    RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
+    RowDataRewriter rowDataRewriter =
+        new RowDataRewriter(this.table(), this.table().spec(), isCaseSensitive(), io, encryption);
 
     List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);

Review comment:
       For my understanding,  this is only the different part between flink, spark, or other engines.  How about introducing  an abstracted method named `List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask, .. )`  in this base rewrite action class. 
   
   Then both spark and flink only need to implement this `rewriteDataForTasks` method,  that will make the code really simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-717633551


   > One other note, now that "Actions" uses reflection to determine what actual implementation to use we may want to move that to "core" as well but I think we can save that for another PR.
   
   Yes, we can consider how to extract "Actions" for spark and flink later


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] simonsssu commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
simonsssu commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508251445



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive = false;
+
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    fileIO = table.io();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.encryptionManager = table.encryption();
+    spec = table.spec();
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public RewriteDataFilesActionBase<ThisT, R> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+  protected void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      rewriteFiles.commit();
+
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+
+      throw e;
+    }
+  }
+
+  protected Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+
+    try {

Review comment:
       Can we try like this ? 
      try (CloseableIterator<FileScanTask> iter = tasksIter) {
         iter.forEachRemaining(task -> {
           StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
           tasksGroupedByPartition.put(structLike, task);
         });
       } catch (IOException e) {
         LOG.warn("Failed to close task iterator", e);
       }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714168491


   hi,@openinx ,I add a table properties'REWRITE_SCAN_LIMIT' ,and set the default value is 100M, in RewriteDataFilesAction, if the file size is greater  than this value, it will not be scanned, that is, it will not be compressed.
   
   Because the default value of the targetSizeInBytes is 128M.For example, my iceberg table has 3 datafiles, each of them is 120M. During the execution of Rewrite, the program will still scan these three files and then regenerate the 3 new files which file size  is the same as the original data. If we periodically perform a Rewrite operation on an iceberg table that is being written in real time, these 120M files will be compressed  repeatedly. I think this is unreasonable.
   
   In addition, in the process of combine FileScanTask into CombinedScanTask, it is difficult to ensure that the size of scanned data is exactly targetSizeInBytes (default: 128M).
   
   org.apache.iceberg.util.BinPacking.Bin#canAdd method.
   ```
       boolean canAdd(long weight) {
         return binWeight + weight <= targetWeight;
       }
   ```
   
   In most cases, the actual scan size will be less than  targetSizeInBytes, so I set this limit to the default 100M,


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508247404



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive = false;
+
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    fileIO = table.io();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.encryptionManager = table.encryption();
+    spec = table.spec();
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public RewriteDataFilesActionBase<ThisT, R> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+  protected void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      rewriteFiles.commit();
+
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+
+      throw e;
+    }
+  }
+
+  protected Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+
+    try {

Review comment:
       I try it,but get a compile error, Resource references are not supported at language level '8'




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-712831987


   > ok, I will write a document later
   
   Sorry,  I did not describe this thing clearly.   You don't have to provide a document for this PR, it's simple enough now.  I mean I might need to provide a more friendly document for the iceberg beginner to start how to contribute to iceberg.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r509912885



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT>

Review comment:
       nit:  we usually name it as `BaseRewriteDataFilesAction`  if it's an abstract class.  btw, seems we don't have to break `extends ..` into a new line because it does not exceed the max length .

##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private PartitionSpec spec;
+  private boolean caseSensitive;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private long rewriteScanLimit;
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    this.fileIO = table.io();
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.rewriteScanLimit = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.REWRITE_SCAN_LIMIT,
+        TableProperties.REWRITE_SCAN_LIMIT_DEFAULT);
+  }
+
+  protected void setCaseSensitive(boolean caseSensitive) {
+    this.caseSensitive = caseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {

Review comment:
       nit:   rename the `isCaseSensitive` to `caseSensitive` pls. for example, we `ManifestGroup` has the method : 
   ```java
     ManifestGroup caseSensitive(boolean newCaseSensitive) {
       this.caseSensitive = newCaseSensitive;
       deleteIndexBuilder.caseSensitive(newCaseSensitive);
       return this;
     }
   ```

##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,12 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
-  }
-
-  @Override
-  public RewriteDataFilesActionResult execute() {
-    CloseableIterable<FileScanTask> fileScanTasks = null;
-    try {
-      fileScanTasks = table.newScan()
-          .caseSensitive(caseSensitive)
-          .ignoreResiduals()
-          .filter(filter)
-          .planFiles();
-    } finally {
-      try {
-        if (fileScanTasks != null) {
-          fileScanTasks.close();
-        }
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterable", ioe);
-      }
-    }
-
-    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
-    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
-        .filter(kv -> kv.getValue().size() > 1)
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-    // Nothing to rewrite if there's only one DataFile in each partition.
-    if (filteredGroupedTasks.isEmpty()) {
-      return RewriteDataFilesActionResult.empty();
-    }
-
-    // Split and combine tasks under each partition
-    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
-        .map(scanTasks -> {
-          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
-              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
-          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
-        })
-        .flatMap(Streams::stream)
-        .collect(Collectors.toList());
-
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
     JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-
-    Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
-    Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
-
-    RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
-
-    List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
-    List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
-        .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
-        .collect(Collectors.toList());
-    replaceDataFiles(currentDataFiles, addedDataFiles);
-
-    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
-  }
-
-  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
-      CloseableIterator<FileScanTask> tasksIter) {
-    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
-        Maps.newHashMap(), Lists::newArrayList);
-
-    try {
-      tasksIter.forEachRemaining(task -> {
-        StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
-        tasksGroupedByPartition.put(structLike, task);
-      });
-
-    } finally {
-      try {
-        tasksIter.close();
-      } catch (IOException ioe) {
-        LOG.warn("Failed to close task iterator", ioe);
-      }
-    }
-
-    return tasksGroupedByPartition.asMap();
-  }
-
-  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
-    try {
-      RewriteFiles rewriteFiles = table.newRewrite();
-      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
-      commit(rewriteFiles);
-
-    } catch (Exception e) {
-      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
-          .noRetry()
-          .suppressFailureWhenFinished()
-          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
-          .run(fileIO::deleteFile);
-
-      throw e;
-    }
+    Broadcast<FileIO> io = sparkContext.broadcast(SparkUtil.serializableFileIO(this.table()));

Review comment:
       nit:  we usually use `this` to assign value to a local field, so that we could distinguish it's a local field member assignment or normal field assignment.  If call the private or protected methods, we don't use `this`. 

##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private PartitionSpec spec;
+  private boolean caseSensitive;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private long rewriteScanLimit;
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    this.fileIO = table.io();
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.rewriteScanLimit = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.REWRITE_SCAN_LIMIT,
+        TableProperties.REWRITE_SCAN_LIMIT_DEFAULT);
+  }
+
+  protected void setCaseSensitive(boolean caseSensitive) {
+    this.caseSensitive = caseSensitive;
+  }
+
+  protected boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  /**
+   * Set the size of the scanned file. If the file size is greater than this value, it will not be scanned and will not
+   * be compressed.
+   *
+   * @param limitSize the limit size of the scanned file ,default is 100M
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT> rewriteScanLimit(long limitSize) {
+    Preconditions.checkArgument(limitSize > 0L, "Invalid rewriteScanLimit size .");
+    this.rewriteScanLimit = limitSize;
+    return this;
+  }
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public RewriteDataFilesActionBase<ThisT> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+
+  /**
+   * Specify the target rewrite data file size in bytes
+   *
+   * @param targetSize size in bytes of rewrite data file
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT> targetSizeInBytes(long targetSize) {
+    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
+        targetSize);
+    this.targetSizeInBytes = targetSize;
+    return this;
+  }
+
+
+  private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      commit(rewriteFiles);
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+      throw e;
+    }
+  }
+
+  private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+    try (CloseableIterator<FileScanTask> iterator = tasksIter) {
+      iterator.forEachRemaining(task -> {
+        if (task.length() < rewriteScanLimit) {
+          StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
+          tasksGroupedByPartition.put(structLike, task);
+        }
+      });
+    } catch (IOException e) {
+      LOG.warn("Failed to close task iterator", e);
+    }
+    return tasksGroupedByPartition.asMap();
+  }
+
+  private List<CombinedScanTask> getCombinedScanTasks(
+      Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks) {
+    // Split and combine tasks under each partition
+    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
+        .map(scanTasks -> {
+          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
+          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
+        })
+        .flatMap(Streams::stream)
+        .collect(Collectors.toList());
+    return combinedScanTasks;
+  }
+
+  @Override
+  public RewriteDataFilesActionResult execute() {
+    CloseableIterable<FileScanTask> fileScanTasks = null;
+    try {
+      fileScanTasks = table.newScan()
+          .caseSensitive(caseSensitive)
+          .ignoreResiduals()
+          .filter(filter)
+          .planFiles();
+    } finally {
+      try {
+        if (fileScanTasks != null) {
+          fileScanTasks.close();
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Failed to close task iterable", ioe);
+      }
+    }
+
+    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
+    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
+        .filter(kv -> kv.getValue().size() > 1)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    // Nothing to rewrite if there's only one DataFile in each partition.
+    if (filteredGroupedTasks.isEmpty()) {
+      return RewriteDataFilesActionResult.empty();
+    }
+    // Split and combine tasks under each partition
+    List<CombinedScanTask> combinedScanTasks = getCombinedScanTasks(filteredGroupedTasks);

Review comment:
       Is it necessary to move the split & combine parts into a separate method ?  I think the original codes is more clear and we don't have to change it,  it break the steps into several code blocks and each block represent one step.  Pls keep the code as it is if we don't have a strong reason to change it.

##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT>
+    extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private PartitionSpec spec;
+  private boolean caseSensitive;
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private long rewriteScanLimit;
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    this.fileIO = table.io();
+    this.spec = table.spec();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.rewriteScanLimit = PropertyUtil.propertyAsLong(

Review comment:
       This is a refactor PR while introducing the `rewriteScanLimit` seems to be a new feature,  we'd better not mix the refactor & new feature development in a single PR.  It's helpful for reviewing and providing full unit tests if we really need the `rewriteScanLimit`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] simonsssu commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
simonsssu commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r507423811



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive = false;
+

Review comment:
       Useless blank line.

##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive = false;
+
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    fileIO = table.io();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.encryptionManager = table.encryption();
+    spec = table.spec();
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public RewriteDataFilesActionBase<ThisT, R> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+  protected void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      rewriteFiles.commit();
+
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+
+      throw e;
+    }
+  }
+
+  protected Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+
+    try {

Review comment:
       can we use try-with-resource to simply the logic of handle exception here ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508267694



##########
File path: core/src/main/java/org/apache/iceberg/actions/BaseAction.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseAction<R> implements Action<R> {
+
+  protected abstract Table table();
+
+  protected String metadataTableName(MetadataTableType type) {
+    return metadataTableName(table().name(), type);
+  }
+
+  protected String metadataTableName(String tableName, MetadataTableType type) {
+    if (tableName.contains("/")) {
+      return tableName + "#" + type;
+    } else if (tableName.startsWith("hadoop.")) {
+      // for HadoopCatalog tables, use the table location to load the metadata table
+      // because IcebergCatalog uses HiveCatalog when the table is identified by name
+      return table().location() + "#" + type;
+    } else if (tableName.startsWith("hive.")) {
+      // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
+      return tableName.replaceFirst("hive\\.", "") + "." + type;

Review comment:
       Flink is not used for the time being. I think this method is used for metadata, so I also extracted it. Currently we have not done RewriteManifestsAction and ExpireSnapshotsAction for flink. I think if we do it in the future, it should be useful




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508932341



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesActionBase.class);
+
+  private final Table table;
+  private final FileIO fileIO;
+  private final EncryptionManager encryptionManager;
+
+  private Expression filter;
+  private long targetSizeInBytes;
+  private int splitLookback;
+  private long splitOpenFileCost;
+  private PartitionSpec spec = null;
+  private boolean caseSensitive = false;
+
+
+  public RewriteDataFilesActionBase(Table table) {
+    this.table = table;
+    fileIO = table.io();
+    this.filter = Expressions.alwaysTrue();
+    long splitSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_SIZE,
+        TableProperties.SPLIT_SIZE_DEFAULT);
+    long targetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+    this.splitLookback = PropertyUtil.propertyAsInt(
+        table.properties(),
+        TableProperties.SPLIT_LOOKBACK,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    this.splitOpenFileCost = PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.SPLIT_OPEN_FILE_COST,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
+    this.encryptionManager = table.encryption();
+    spec = table.spec();
+  }
+
+
+  /**
+   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
+   * filter may be rewritten.
+   *
+   * @param expr Expression to filter out DataFiles
+   * @return this for method chaining
+   */
+
+  public RewriteDataFilesActionBase<ThisT, R> filter(Expression expr) {
+    this.filter = Expressions.and(filter, expr);
+    return this;
+  }
+
+  /**
+   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
+   *
+   * @param specId PartitionSpec id to rewrite
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> outputSpecId(int specId) {
+    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
+    this.spec = table.specs().get(specId);
+    return this;
+  }
+
+  /**
+   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+   * planning cost.
+   * <p>
+   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
+   * metadata, user can use a lookback of 1.
+   *
+   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitLookback(int lookback) {
+    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
+    this.splitLookback = lookback;
+    return this;
+  }
+
+  /**
+   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
+   * threshold, Iceberg will use this value to do count.
+   * <p>
+   * this configuration controls the number of files to compact for each task, small value would lead to a high
+   * compaction, the default value is 4MB.
+   *
+   * @param openFileCost minimum file size to count to pack into one "bin".
+   * @return this for method chaining
+   */
+  public RewriteDataFilesActionBase<ThisT, R> splitOpenFileCost(long openFileCost) {
+    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
+    this.splitOpenFileCost = openFileCost;
+    return this;
+  }
+
+  protected void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
+    try {
+      RewriteFiles rewriteFiles = table.newRewrite();
+      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
+      rewriteFiles.commit();
+
+    } catch (Exception e) {
+      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
+          .noRetry()
+          .suppressFailureWhenFinished()
+          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
+          .run(fileIO::deleteFile);
+
+      throw e;
+    }
+  }
+
+  protected Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
+      CloseableIterator<FileScanTask> tasksIter) {
+    ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
+        Maps.newHashMap(), Lists::newArrayList);
+
+    try {

Review comment:
       This is ok, I updated the code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-714201951


   hi,@simonsssu:
   No matter how much this value is set, there will always be files smaller than the threshold that are repeatedly compressed. My idea is to find a compromise in various cases, such as do Rewrite Action on continuously written streaming data. First One Rewrite Action generated a 99M file, but when the second Rewrite Action was performed, the streaming task generated a lot of small files again. For example, if a file is 10M,  this 99M file and 10M files are merged into 109M,  to avoid this 99M file be compressed repeatedly.
   
   If we only temporarily rewrite the datafile once, the 99M file is regenerated, I think it is acceptable.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#issuecomment-717303333


   One other note, now that "Actions" uses reflection to determine what actual implementation to use we may want to move that to "core" as well but I think we can save that for another PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhangjun0x01 commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r507663775



##########
File path: core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionBase.java
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeWrapper;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class RewriteDataFilesActionBase<ThisT, R> extends BaseSnapshotUpdateAction<ThisT, R> {

Review comment:
       I'm very sorry, this is my mistake. I wanted to do RewriteDataFilesAction of flink and extract the common logic to iceberg-core first, and then refactor spark, but finally found that there was a problem with checkstyle, so I put some common code in iceberg-core to solve the checkstyle, forgot to merge the spark code.
   
   I have update the code




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1624: extract some common functions to iceberg-core

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r513147767



##########
File path: spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -111,171 +49,17 @@ protected RewriteDataFilesAction self() {
   }
 
   @Override
-  protected Table table() {
-    return table;
-  }
-
-  /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction outputSpecId(int specId) {
-    Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
-    this.spec = table.specs().get(specId);
-    return this;
-  }
-
-  /**
-   * Specify the target rewrite data file size in bytes
-   *
-   * @param targetSize size in bytes of rewrite data file
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
-    Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
-        targetSize);
-    this.targetSizeInBytes = targetSize;
-    return this;
-  }
-
-  /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task.
-   * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
-   * task with extra planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
-   *
-   * @param lookback number of "bins" considered when trying to pack the next file split into a task.
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitLookback(int lookback) {
-    Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
-    this.splitLookback = lookback;
-    return this;
-  }
-
-  /**
-   * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
-   * threshold, Iceberg will use this value to do count.
-   * <p>
-   * this configuration controls the number of files to compact for each task, small value would lead to a
-   * high compaction, the default value is 4MB.
-   *
-   * @param openFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
-    Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
-    this.splitOpenFileCost = openFileCost;
-    return this;
-  }
-
-  /**
-   * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
-   * filter may be rewritten.
-   *
-   * @param expr Expression to filter out DataFiles
-   * @return this for method chaining
-   */
-  public RewriteDataFilesAction filter(Expression expr) {
-    this.filter = Expressions.and(filter, expr);
-    return this;
+  protected FileIO fileIO() {
+    return SparkUtil.serializableFileIO(table());

Review comment:
       That would save lots of fileIO serialization if we make this lazy, I agree with @RussellSpitzer . The upper layer may call this fileIO method several times, while we only need to serialize once. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org