You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/24 21:45:42 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request #2379: Compaction Strategies

RussellSpitzer opened a new pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379


   Hey everyone,
   I've been thinking a while about redoing some thing in the RewriteDatafiles code to try to let everyone have a bit more flexibility with different means of running datafile compactions.
   
   I did some work rewriting the proposal previously submitted by @aokolnychyi and wrote up a prototype of how I can of imagine some of it working.
   https://docs.google.com/document/d/1aXo1VzuXxSuqcTzMLSQdnivMVgtLExgDWUFMvWeXRxc/edit?ts=600b0432#heading=h.rz9lj3frnyw7
   
   If you are interested please take a look at the design doc ^ and then check out the PR. I'm hoping that with this kind of new api we'll be able to introduce a variety of mechanisms for dealing with many files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609298267



##########
File path: core/src/main/java/org/apache/iceberg/spark/actions/compaction/BinningCompactionStrategy.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.util.StreamUtil.streamOf;
+
+/**
+ * A Compaction strategy for rewriting files based on their sizes, the goal is to end up with files
+ * whose size is close to the Target Size as specified in this strategy's options.
+ */
+public abstract class BinningCompactionStrategy implements DataCompactionStrategy {
+
+
+  public static final String TARGET_SIZE_OPTION = "target_size";
+  public static final String TARGET_THRESHOLD_OPTION = "target_threshold";
+
+  // TODO Maybe these should be global?
+  public static final String MAX_JOB_SIZE_OPTION = "max_job_size";
+
+  protected static final long TARGET_THRESHOLD_DEFAULT = 50 * 1024 * 1024; // 50 MB
+  protected static final long TARGET_SIZE_DEFAULT = 512 * 1024 * 1024; // 512 MB // TODO (Table default)
+  protected static final long MAX_JOB_SIZE_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB
+
+  private long targetThreshold = TARGET_THRESHOLD_DEFAULT;
+  private long targetSize = TARGET_SIZE_DEFAULT;
+  private long maxJobSize = MAX_JOB_SIZE_DEFAULT;
+
+  @Override
+  public String name() {
+    return "Binning Size Based Compaction Strategy";
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.of(TARGET_SIZE_OPTION, TARGET_THRESHOLD_OPTION, MAX_JOB_SIZE_OPTION);
+  }
+
+  @Override
+  public Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles) {
+    return streamOf(dataFiles)
+        .filter(task -> Math.abs(task.file().fileSizeInBytes() - targetSize) > targetThreshold)
+        .iterator();
+  }
+
+  @Override
+  public Iterator<List<FileScanTask>> groupsFilesIntoJobs(Iterator<FileScanTask> dataFiles) {
+    BinPacking.ListPacker<FileScanTask> packer = new BinPacking.ListPacker<>(maxJobSize, 1, false);

Review comment:
       I think we will need to discuss how to pack files and whether we need to look at the min/max stats on the sort key, for example. Does not have to be done now, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609282344



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * Pass an already constructed DataCompactionStrategy
+   *
+   * @param strategy the algorithm to be used during compaction
+   * @return
+   */
+  CompactDataFiles compactionStrategy(DataCompactionStrategy strategy);
+
+  // Todo Do we want to allow generic class loading here? I think yes this will probably be framework specific
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the
+   * Compaction strategy. This will be used in addition to whatever rules the Compaction strategy
+   * generates.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  class Result {

Review comment:
       I think we have to be consistent. Either all actions should define their results as interfaces or as classes. Does not have to be addressed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609346859



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * Pass an already constructed DataCompactionStrategy
+   *
+   * @param strategy the algorithm to be used during compaction
+   * @return
+   */
+  CompactDataFiles compactionStrategy(DataCompactionStrategy strategy);
+
+  // Todo Do we want to allow generic class loading here? I think yes this will probably be framework specific
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the
+   * Compaction strategy. This will be used in addition to whatever rules the Compaction strategy
+   * generates.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  class Result {
+    private Map<CompactionJobInfo, CompactionResult> resultMap;
+
+
+    public Result(
+        Map<CompactionJobInfo, CompactionResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionJobInfo, CompactionResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  class CompactionResult {
+    private int numFilesRemoved;
+    private int numFilesAdded;
+
+    public CompactionResult(int numFilesRemoved, int numFilesAdded) {
+      this.numFilesRemoved = numFilesRemoved;
+      this.numFilesAdded = numFilesAdded;
+    }
+
+    public int numFilesAdded() {
+      return numFilesAdded;
+    }
+
+    public int numFilesRemoved() {
+      return numFilesRemoved;
+    }
+  }
+
+  class CompactionJobInfo {
+    private final int jobIndex;
+    private final int partitionIndex;
+    private final StructLike partition;
+
+    public CompactionJobInfo(int jobIndex, int partitionIndex, StructLike partition) {
+      this.jobIndex = jobIndex;
+      this.partitionIndex = partitionIndex;
+      this.partition = partition;
+    }
+
+    public int jobIndex() {
+      return jobIndex;
+    }
+
+    public int partitionIndex() {
+      return partitionIndex;
+    }
+
+    public StructLike partition() {
+      return partition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CompactionJobInfo that = (CompactionJobInfo) o;
+      return jobIndex == that.jobIndex && partitionIndex == that.partitionIndex &&
+          Objects.equals(partition, that.partition);

Review comment:
       The correct way to compare two different `StructLike` is using the comparator that is built from `Comparators#forType` because almost all of the `StructLike` implementation does not implement the `equals` & `hashCode` method.  One example is: https://github.com/apache/iceberg/blob/90225d6c9413016d611e2ce5eff37db1bc1b4fc5/core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java#L76 

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/compaction/SparkBinningCompactionStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.source.SourceUtil;
+import org.apache.spark.sql.SparkSession;
+
+public class SparkBinningCompactionStrategy extends BinningCompactionStrategy {

Review comment:
       Here , we've introduced a `SparkBinningCompactionStrategy` ,   that sounds like is customized for the spark query engine.  I think the compaction strategy is common to various query engines, it just use its own strategy to plan the sub-tasks for each job,  the real job execution is binded to query engine.  Maybe we will need to move `rewriteFiles` out of the strategy intefaces.

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * Pass an already constructed DataCompactionStrategy
+   *
+   * @param strategy the algorithm to be used during compaction
+   * @return
+   */
+  CompactDataFiles compactionStrategy(DataCompactionStrategy strategy);
+
+  // Todo Do we want to allow generic class loading here? I think yes this will probably be framework specific
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the
+   * Compaction strategy. This will be used in addition to whatever rules the Compaction strategy
+   * generates.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  class Result {
+    private Map<CompactionJobInfo, CompactionResult> resultMap;
+
+
+    public Result(
+        Map<CompactionJobInfo, CompactionResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionJobInfo, CompactionResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  class CompactionResult {
+    private int numFilesRemoved;
+    private int numFilesAdded;
+
+    public CompactionResult(int numFilesRemoved, int numFilesAdded) {
+      this.numFilesRemoved = numFilesRemoved;
+      this.numFilesAdded = numFilesAdded;
+    }
+
+    public int numFilesAdded() {
+      return numFilesAdded;
+    }
+
+    public int numFilesRemoved() {
+      return numFilesRemoved;
+    }
+  }
+
+  class CompactionJobInfo {
+    private final int jobIndex;
+    private final int partitionIndex;
+    private final StructLike partition;
+
+    public CompactionJobInfo(int jobIndex, int partitionIndex, StructLike partition) {
+      this.jobIndex = jobIndex;
+      this.partitionIndex = partitionIndex;
+      this.partition = partition;
+    }
+
+    public int jobIndex() {
+      return jobIndex;
+    }
+
+    public int partitionIndex() {
+      return partitionIndex;
+    }
+
+    public StructLike partition() {
+      return partition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CompactionJobInfo that = (CompactionJobInfo) o;
+      return jobIndex == that.jobIndex && partitionIndex == that.partitionIndex &&
+          Objects.equals(partition, that.partition);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(jobIndex, partitionIndex, partition);

Review comment:
       The `partition` instance will need `JavaHash.forType()` to generate the hash code.

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);
+
+  /**
+   * Groups file scans into lists which will be processed in a single executable
+   * unit. Each group will end up being committed as an independent set of
+   * changes. This creates the jobs which will eventually be run as by the underlying Action.
+   *
+   * @param dataFiles iterator of files to be rewritten
+   * @return iterator of sets of files to be processed together
+   */
+  Iterator<List<FileScanTask>> groupsFilesIntoJobs(Iterator<FileScanTask> dataFiles);
+
+  /**
+   * Method which will actually rewrite and commit changes to a group of files
+   * based on the particular CompactionStrategy's Algorithm. This will most likely be
+   * Action framework specific.
+   *
+   * @param table table being modified
+   * @param filesToRewrite a group of files to be rewritten together
+   * @return a list of newly committed files
+   */
+  List<DataFile> rewriteFiles(Table table, List<FileScanTask> filesToRewrite);

Review comment:
       I think we need to consider the format v2's compaction strategy , at least in the API design level. Otherwise we will deprecate this API soon once we introduce compaction implementation for v2 in https://github.com/apache/iceberg/pull/2303/files




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609725115



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SourceUtil.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl$;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation$;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SourceUtil {
+
+  private SourceUtil() {
+  }
+
+  public static List<DataFile> rewriteFiles(SparkSession spark, Table table, List<FileScanTask> filesToRewrite,

Review comment:
       I was thinking of this as well, when i started writing it I just ended up copying a ton of code and I wasn't sure if I could make it work without using the Merge rewrite plan anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609291090



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       Will this be invoked for each partition after we do planning using the user-defined expression?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609718416



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       Yeah I'm trying to differentiate Manfiest entry pushdowns and algorithm specific filtering




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609716553



##########
File path: core/src/main/java/org/apache/iceberg/spark/actions/compaction/BinningCompactionStrategy.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.util.StreamUtil.streamOf;
+
+/**
+ * A Compaction strategy for rewriting files based on their sizes, the goal is to end up with files
+ * whose size is close to the Target Size as specified in this strategy's options.
+ */
+public abstract class BinningCompactionStrategy implements DataCompactionStrategy {
+
+
+  public static final String TARGET_SIZE_OPTION = "target_size";
+  public static final String TARGET_THRESHOLD_OPTION = "target_threshold";
+
+  // TODO Maybe these should be global?
+  public static final String MAX_JOB_SIZE_OPTION = "max_job_size";
+
+  protected static final long TARGET_THRESHOLD_DEFAULT = 50 * 1024 * 1024; // 50 MB
+  protected static final long TARGET_SIZE_DEFAULT = 512 * 1024 * 1024; // 512 MB // TODO (Table default)
+  protected static final long MAX_JOB_SIZE_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB
+
+  private long targetThreshold = TARGET_THRESHOLD_DEFAULT;
+  private long targetSize = TARGET_SIZE_DEFAULT;
+  private long maxJobSize = MAX_JOB_SIZE_DEFAULT;
+
+  @Override
+  public String name() {
+    return "Binning Size Based Compaction Strategy";
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.of(TARGET_SIZE_OPTION, TARGET_THRESHOLD_OPTION, MAX_JOB_SIZE_OPTION);
+  }
+
+  @Override
+  public Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles) {
+    return streamOf(dataFiles)
+        .filter(task -> Math.abs(task.file().fileSizeInBytes() - targetSize) > targetThreshold)
+        .iterator();
+  }
+
+  @Override
+  public Iterator<List<FileScanTask>> groupsFilesIntoJobs(Iterator<FileScanTask> dataFiles) {
+    BinPacking.ListPacker<FileScanTask> packer = new BinPacking.ListPacker<>(maxJobSize, 1, false);

Review comment:
       Yep, I'm assuming some algorithms may want to do that all the time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609715625



##########
File path: core/src/main/java/org/apache/iceberg/spark/actions/compaction/BinningCompactionStrategy.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.util.StreamUtil.streamOf;
+
+/**
+ * A Compaction strategy for rewriting files based on their sizes, the goal is to end up with files
+ * whose size is close to the Target Size as specified in this strategy's options.
+ */
+public abstract class BinningCompactionStrategy implements DataCompactionStrategy {
+
+
+  public static final String TARGET_SIZE_OPTION = "target_size";
+  public static final String TARGET_THRESHOLD_OPTION = "target_threshold";
+
+  // TODO Maybe these should be global?
+  public static final String MAX_JOB_SIZE_OPTION = "max_job_size";
+
+  protected static final long TARGET_THRESHOLD_DEFAULT = 50 * 1024 * 1024; // 50 MB
+  protected static final long TARGET_SIZE_DEFAULT = 512 * 1024 * 1024; // 512 MB // TODO (Table default)
+  protected static final long MAX_JOB_SIZE_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB
+
+  private long targetThreshold = TARGET_THRESHOLD_DEFAULT;
+  private long targetSize = TARGET_SIZE_DEFAULT;
+  private long maxJobSize = MAX_JOB_SIZE_DEFAULT;
+
+  @Override
+  public String name() {
+    return "Binning Size Based Compaction Strategy";
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.of(TARGET_SIZE_OPTION, TARGET_THRESHOLD_OPTION, MAX_JOB_SIZE_OPTION);
+  }
+
+  @Override
+  public Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles) {
+    return streamOf(dataFiles)
+        .filter(task -> Math.abs(task.file().fileSizeInBytes() - targetSize) > targetThreshold)

Review comment:
       I think that's a good Idea, this is really just a prototype so we can build consensus on the approach. I didn't want to code in all the features just yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609296096



##########
File path: core/src/main/java/org/apache/iceberg/spark/actions/compaction/BinningCompactionStrategy.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.util.StreamUtil.streamOf;
+
+/**
+ * A Compaction strategy for rewriting files based on their sizes, the goal is to end up with files
+ * whose size is close to the Target Size as specified in this strategy's options.
+ */
+public abstract class BinningCompactionStrategy implements DataCompactionStrategy {
+
+
+  public static final String TARGET_SIZE_OPTION = "target_size";
+  public static final String TARGET_THRESHOLD_OPTION = "target_threshold";
+
+  // TODO Maybe these should be global?
+  public static final String MAX_JOB_SIZE_OPTION = "max_job_size";
+
+  protected static final long TARGET_THRESHOLD_DEFAULT = 50 * 1024 * 1024; // 50 MB
+  protected static final long TARGET_SIZE_DEFAULT = 512 * 1024 * 1024; // 512 MB // TODO (Table default)
+  protected static final long MAX_JOB_SIZE_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB
+
+  private long targetThreshold = TARGET_THRESHOLD_DEFAULT;
+  private long targetSize = TARGET_SIZE_DEFAULT;
+  private long maxJobSize = MAX_JOB_SIZE_DEFAULT;
+
+  @Override
+  public String name() {
+    return "Binning Size Based Compaction Strategy";
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.of(TARGET_SIZE_OPTION, TARGET_THRESHOLD_OPTION, MAX_JOB_SIZE_OPTION);
+  }
+
+  @Override
+  public Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles) {
+    return streamOf(dataFiles)
+        .filter(task -> Math.abs(task.file().fileSizeInBytes() - targetSize) > targetThreshold)

Review comment:
       I think we should have separate thresholds for min and max allowed size as discussed [here](https://docs.google.com/document/d/1aXo1VzuXxSuqcTzMLSQdnivMVgtLExgDWUFMvWeXRxc/edit?disco=AAAAJ3NGjg8).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609969412



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SourceUtil.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl$;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation$;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SourceUtil {
+
+  private SourceUtil() {
+  }
+
+  public static List<DataFile> rewriteFiles(SparkSession spark, Table table, List<FileScanTask> filesToRewrite,

Review comment:
       A summary of what we discussed this morning.
   
   There are 3 options we consider now:
   - Building a Spark plan and executing it
     - Too fragile, depends on low-level details from Spark that frequently change 
   - Save file scan tasks to compact into a separate file(s) and pass the location of that file to the source that would read the files from there as opposed to doing scan planning. That file could be a manifest list or something else.
     -  Will have a performance penalty for writing extra files that contain what to compact.
   - Have a static map/distributed cache that will be propagated in strategies (compaction_id -> tasks) and pass compaction_id as a read option to either the current source or to a custom data source responsible for compaction.
     - TBD.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer edited a comment on pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer edited a comment on pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#issuecomment-806210557


   Note this is currently a WIP and is missing some functionality 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609279335



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * Pass an already constructed DataCompactionStrategy
+   *
+   * @param strategy the algorithm to be used during compaction
+   * @return
+   */
+  CompactDataFiles compactionStrategy(DataCompactionStrategy strategy);
+
+  // Todo Do we want to allow generic class loading here? I think yes this will probably be framework specific
+  CompactDataFiles compactionStrategy(String strategyName);

Review comment:
       If we are to support dynamic loading, will it be done here or through an external utility class? How will we configure the instantiated strategy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609302805



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       Ok, I see we combine `preFilter` expr with the user-defined expression and push it down.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609721494



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);

Review comment:
       yep, name tbd




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r610966004



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       We probably need to think whether this should be expressed using `Iterator` or `Iterable`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609285558



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);

Review comment:
       I think we dropped `withXXX` prefixes in v2 actions.

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);

Review comment:
       nit: I think we dropped `withXXX` prefixes in v2 actions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609719445



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);

Review comment:
       That's my current thought, so that classes don't have a custom constructor. Instead they are a noop constructor that has all options from the framework passed through ... maybe this is overboard?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609283260



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * Pass an already constructed DataCompactionStrategy
+   *
+   * @param strategy the algorithm to be used during compaction
+   * @return
+   */
+  CompactDataFiles compactionStrategy(DataCompactionStrategy strategy);
+
+  // Todo Do we want to allow generic class loading here? I think yes this will probably be framework specific
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the
+   * Compaction strategy. This will be used in addition to whatever rules the Compaction strategy
+   * generates.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  class Result {
+    private Map<CompactionJobInfo, CompactionResult> resultMap;
+
+
+    public Result(
+        Map<CompactionJobInfo, CompactionResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionJobInfo, CompactionResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  class CompactionResult {
+    private int numFilesRemoved;
+    private int numFilesAdded;
+
+    public CompactionResult(int numFilesRemoved, int numFilesAdded) {
+      this.numFilesRemoved = numFilesRemoved;
+      this.numFilesAdded = numFilesAdded;
+    }
+
+    public int numFilesAdded() {
+      return numFilesAdded;
+    }
+
+    public int numFilesRemoved() {
+      return numFilesRemoved;
+    }
+  }
+
+  class CompactionJobInfo {
+    private final int jobIndex;

Review comment:
       What is `job_index` and what is `partitionIndex`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer closed pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer closed pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609279960



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);

Review comment:
       Is it the proposed `max_concurrent_compactions` in the design doc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#issuecomment-806210557


   Note this is currently a WIP and is missing a some functionality 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609969412



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SourceUtil.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl$;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation$;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SourceUtil {
+
+  private SourceUtil() {
+  }
+
+  public static List<DataFile> rewriteFiles(SparkSession spark, Table table, List<FileScanTask> filesToRewrite,

Review comment:
       A summary of what we discussed this morning.
   
   There are 3 options we consider now:
   - Building a Spark plan and executing it
     - Too fragile, depends on low-level details from Spark that frequently change 
   - Save file scan tasks to compact into a separate file(s) and pass the location of that file to the source that would read the files from there as opposed to doing scan planning
     -  Will have a performance penalty for writing the compaction files.
   - Have a static map/distributed cache that will be propagated in strategies (compaction_id -> tasks) and pass compaction_id as a read option to either the current source or to a custom data source.
     - TBD.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609721103



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * The parallelism level to use when processing jobs generated by the Compaction Strategy. The structure
+   * and contents of the jobs is determined by the Compaction Strategy and options passed to it. When running
+   * each job will be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * Pass an already constructed DataCompactionStrategy
+   *
+   * @param strategy the algorithm to be used during compaction
+   * @return
+   */
+  CompactDataFiles compactionStrategy(DataCompactionStrategy strategy);
+
+  // Todo Do we want to allow generic class loading here? I think yes this will probably be framework specific
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the
+   * Compaction strategy. This will be used in addition to whatever rules the Compaction strategy
+   * generates.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  class Result {
+    private Map<CompactionJobInfo, CompactionResult> resultMap;
+
+
+    public Result(
+        Map<CompactionJobInfo, CompactionResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionJobInfo, CompactionResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  class CompactionResult {
+    private int numFilesRemoved;
+    private int numFilesAdded;
+
+    public CompactionResult(int numFilesRemoved, int numFilesAdded) {
+      this.numFilesRemoved = numFilesRemoved;
+      this.numFilesAdded = numFilesAdded;
+    }
+
+    public int numFilesAdded() {
+      return numFilesAdded;
+    }
+
+    public int numFilesRemoved() {
+      return numFilesRemoved;
+    }
+  }
+
+  class CompactionJobInfo {
+    private final int jobIndex;

Review comment:
       If there are 100 Job's and a particular partition has 5 jobs in it
   The job index is what job out of 100 it is
   While the partitionIndex is what job out of the 5 for this partition it is




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r605095242



##########
File path: core/src/main/java/org/apache/iceberg/spark/actions/compaction/BinningCompactionStrategy.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.util.StreamUtil.streamOf;
+
+/**
+ * A Compaction strategy for rewriting files based on their sizes, the goal is to end up with files
+ * whose size is close to the Target Size as specified in this strategy's options.
+ */
+public abstract class BinningCompactionStrategy implements DataCompactionStrategy {
+
+
+  public static final String TARGET_SIZE_OPTION = "target_size";
+  public static final String TARGET_THRESHOLD_OPTION = "target_threshold";
+
+  // TODO Maybe these should be global?
+  public static final String MAX_JOB_SIZE_OPTION = "max_job_size";

Review comment:
       Should we consider name-spacing some of these configurations (like we do for table properties such as `write.split.*`? Maybe something like `compaction.binning.target_size` or just `compaction.*`.
   
   I see we don't do that for any of the current actions that are called as stored procedures, so this might have already been discussed or it might be a larger discussion around actions in general and whether or not we'd like to allow for default options to be set which deviate from the values we choose as defaults.
   
   This possibly ties into the consideration of a global configuration for `max_job_size`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#issuecomment-806211441


   @aokolnychyi , @openinx , @zhangjun0x01, @jerryshao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609309171



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);

Review comment:
       Am I correct we are going to propagate all options set in the action?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609717798



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       Currently yes, This is one of the things I think we need to discuss. Currently the implementation assumes that the framework will group scan tasks by partition prior to calling this method but we may want to allow an implementation to do its own grouping at that level as well. 
   
   For example if you want to recompact data into new partitions you need to group on an arbitrary column set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609286746



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       Hm, do we need both `preFilter` and `filesToCompact`? Does it mean we do 2 passes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609722893



##########
File path: core/src/main/java/org/apache/iceberg/spark/actions/compaction/BinningCompactionStrategy.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.actions.compaction.DataCompactionStrategy;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.util.StreamUtil.streamOf;
+
+/**
+ * A Compaction strategy for rewriting files based on their sizes, the goal is to end up with files
+ * whose size is close to the Target Size as specified in this strategy's options.
+ */
+public abstract class BinningCompactionStrategy implements DataCompactionStrategy {
+
+
+  public static final String TARGET_SIZE_OPTION = "target_size";
+  public static final String TARGET_THRESHOLD_OPTION = "target_threshold";
+
+  // TODO Maybe these should be global?
+  public static final String MAX_JOB_SIZE_OPTION = "max_job_size";

Review comment:
       For some properties I think that makes sense, but I think it's a little hard to tell whether a parameter is strategy specific at this time. For example "target_size" is probably going to be universal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609784690



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SourceUtil.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl$;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation$;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SourceUtil {
+
+  private SourceUtil() {
+  }
+
+  public static List<DataFile> rewriteFiles(SparkSession spark, Table table, List<FileScanTask> filesToRewrite,
+                                            long targetSize) {
+
+    SparkTable sparkTable = new SparkTable(table, true);
+    StructType sparkSchema = sparkTable.schema();
+
+    // Build Read
+    Scan scan = SparkFileScan.scanOfFiles(spark, table, filesToRewrite, targetSize);

Review comment:
       We could expose this SparkFileScan which is the new read in a Source but I was having a hard time how we also get the write plan to work correctly ... Maybe I just didn't think about it enough




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609969412



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SourceUtil.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl$;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation$;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SourceUtil {
+
+  private SourceUtil() {
+  }
+
+  public static List<DataFile> rewriteFiles(SparkSession spark, Table table, List<FileScanTask> filesToRewrite,

Review comment:
       A summary of what we discussed this morning.
   
   There are 3 options we consider now:
   - Building a Spark plan and executing it
     - Too fragile, depends on low-level details from Spark that frequently change 
   - Save file scan tasks to compact into a separate file(s) and pass the location of that file to the source that would read the files from there as opposed to doing scan planning. That file could be a manifest list or something else.
     -  Will have a performance penalty for writing extra files that contain what to compact.
   - Have a static map/distributed cache that will be propagated in strategies (compaction_id -> tasks) and pass compaction_id as a read option to either the current source or to a custom data source.
     - TBD.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609313088



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SourceUtil.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl$;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation$;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SourceUtil {
+
+  private SourceUtil() {
+  }
+
+  public static List<DataFile> rewriteFiles(SparkSession spark, Table table, List<FileScanTask> filesToRewrite,

Review comment:
       I think we need to reconsider this. @RussellSpitzer, I remember you mentioned an idea with a custom source. Can we do that here? We could have a static map that can be populated from the action and then the custom data source will read that map and compact the files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609708488



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/compaction/SparkBinningCompactionStrategy.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions.compaction;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.source.SourceUtil;
+import org.apache.spark.sql.SparkSession;
+
+public class SparkBinningCompactionStrategy extends BinningCompactionStrategy {

Review comment:
       I was trying to think of a way to do this, but it's difficult because the actual job execution is also algorithm specific. It is both Framework (Spark) and Strategy (binning) specific. Rewriting files by grouping them is one example, but the algorithm could be rewriting files by sorting them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609712917



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {
+    return Expressions.alwaysTrue();
+  }
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);
+
+  /**
+   * Groups file scans into lists which will be processed in a single executable
+   * unit. Each group will end up being committed as an independent set of
+   * changes. This creates the jobs which will eventually be run as by the underlying Action.
+   *
+   * @param dataFiles iterator of files to be rewritten
+   * @return iterator of sets of files to be processed together
+   */
+  Iterator<List<FileScanTask>> groupsFilesIntoJobs(Iterator<FileScanTask> dataFiles);
+
+  /**
+   * Method which will actually rewrite and commit changes to a group of files
+   * based on the particular CompactionStrategy's Algorithm. This will most likely be
+   * Action framework specific.
+   *
+   * @param table table being modified
+   * @param filesToRewrite a group of files to be rewritten together
+   * @return a list of newly committed files
+   */
+  List<DataFile> rewriteFiles(Table table, List<FileScanTask> filesToRewrite);

Review comment:
       For delete file I believe we should be relying on the underling framework's implementation of a the replace operator. For example the MergeInto command needs to have the same "Delete file" removal behavior as this particular rewrite pattern. That said, I still think you can fit just about any delete file compaction into this model as well since we are passing a list of FileScanTasks which should enumerate all positional delete files.  I think delete file comapctions, ie delete files merging and conversion between types should be a separate action.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609284610



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();

Review comment:
       Question: is `supportedOptions` a bit more descriptive?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2379: Compaction Strategies

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2379:
URL: https://github.com/apache/iceberg/pull/2379#discussion_r609286012



##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+public interface DataCompactionStrategy extends Serializable {
+
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  default void validateOptions(Map<String, String> options) {
+    Sets.SetView<String> invalidOptions = Sets.difference(options.keySet(), validOptions());
+    if (!invalidOptions.isEmpty()) {
+      String invalidColumnString = invalidOptions.stream().collect(Collectors.joining(",", "[", "]"));
+      String validColumnString = validOptions().stream().collect(Collectors.joining(",", "[", "]"));
+
+      throw new IllegalArgumentException(String.format(
+          "Cannot use strategy %s with unknown options %s. This strategy accepts %s",
+          name(), invalidColumnString, validColumnString));
+    }
+  }
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression
+   * to filter the FileScanTasks which will be created when planning file reads that will later be run through
+   * this compaction strategy.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  default Expression preFilter() {

Review comment:
       I think we will need to play around with the name here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org