You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/15 19:50:25 UTC

[GitHub] [iceberg] RussellSpitzer opened a new pull request #2829: Spark3 sort compaction

RussellSpitzer opened a new pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829


   WIP Based on Other PR's we need in first 
   #2820 
   #2828 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] adamkennedy commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
adamkennedy commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r676984533



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")

Review comment:
       Append? Wouldn't this result in a read query having to read both the sorted version and the original and fold them together, unless a subsequent compaction of the partition occurs, which might not preserve all of the sorting?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r676986074



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")

Review comment:
       @aokolnychyi  :) 
   
   Anton worried about this naming when we designed this special write task. It doesn't actually append to the table in this action, instead it stages these files to be committed in the rewrite operation performed by the base action. So the "append" here really just means "write these files but don't add them to the table". This is triggered by the REWRITTEN_FILE_SCAN_TASK_SET_ID parameter which causes us not to use the normal write path. Sorry it's pretty weird, but we are basically trying to hijack the existing spark write mechanism and while adding a side-effect to track which files were written.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705743430



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);

Review comment:
       What about "compression factor" or something similar? That seems like what you mean by this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705735949



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       What was the failure? Is our hash or equals function not correct for empty structs? Should this use a `StructLikeMap` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705741986



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";

Review comment:
       It isn't clear to me from the name what this does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705753743



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       Okay, so all data where we're changing the partitioning goes into a separate task. That makes sense to me. Probably worth a comment here to explain it.
   
   You may also want to change how this works slightly and use a `StructLikeMap`. Otherwise, `EmptyStruct` looks a lot like structs for the unpartitioned spec. It would or wouldn't be equal depending on the order of the comparison, which probably isn't what you want.
   
   Instead, I would keep a `StructLikeMap` for the current partitioning scheme and then create a separate group using a `List` for the rest.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#issuecomment-916269130


   @aokolnychyi + @rdblue Now that we've moved distribution and sort utils, we can get this implementation in as well :) Please review when you have a chance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706286280



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);

Review comment:
       I think that sounds good, I think the only reason I had this here is that I have another PR for allowing compacting between different specs for BinPack as well and it used basically this same branching logic. For sort I think you are correct and we are fine just always including the partition columns ... I don't think this would effect the range partitioner even if all the values are the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705742827



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";

Review comment:
       Per input file? Basically the default is to balance the data across the same number of files? Seems like we would prefer to try to output `totalInputSize / tableTargetFileSize`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705737663



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")

Review comment:
       Looks like the next test uses `SortStrategy.MIN_INPUT_FILES`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731130809



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -148,10 +170,29 @@ public RewriteDataFiles filter(Expression expression) {
         .planFiles();
 
     try {
-      Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+      StructType partitionType = table.spec().partitionType();
+      StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
+      StructLike emptyStruct = GenericRecord.create(partitionType);
+
+      fileScanTasks.forEach(task -> {
+        /*
+        If a task uses an incompatible partition spec the data inside could contain values which
+        belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
+        grouping them together helps to minimize new files made.
+        */

Review comment:
       Style: we typically use `//` even for multi-line comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731135788



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);

Review comment:
       Did you want to make this change, or leave it as is?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706291305



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;

Review comment:
       Let's omit it for now then. No sense in adding it if it won't work, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705519520



##########
File path: build.gradle
##########
@@ -989,6 +989,11 @@ project(':iceberg-spark3') {
   apply plugin: 'scala'
 
   sourceSets {
+    main {

Review comment:
       @rdblue never-mind this IS required if you want to do cross compilation (Have Java code depend on Scala code in the same module)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705738416



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")
+            .execute();
+
+    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
+
+    shouldHaveSnapshots(table, 2);
+    shouldHaveACleanCache(table);

Review comment:
       Looks like this is probably because you don't want to use `shouldHaveLastCommitSorted(table, "c2");`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r707732248



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")
+          .save(table.name());
+
+      return rewriteCoordinator.fetchNewDataFiles(table, groupID);
+    } finally {
+      manager.removeTasks(table, groupID);
+      rewriteCoordinator.clearRewrite(table, groupID);
+    }
+  }
+
+  protected SparkSession spark() {
+    return this.spark;
+  }
+
+  protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) {
+    Seq<SortOrder> scalaOrder = JavaConverters.asScalaBuffer(Lists.newArrayList(ordering));
+    return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, scalaOrder, plan, conf);

Review comment:
       We can pass Array into a method that requires a Seq in Scala, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r707715564



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";

Review comment:
       Removed this since we don't have the output task combiner code in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731319973



##########
File path: spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SortOrderUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  /**
+   * The number of shuffle partitions and consequently the number of output files
+   * created by the Spark Sort is based on the size of the input data files used
+   * in this rewrite operation. Due to compression, the disk file sizes may not
+   * accurately represent the size of files in the output. This parameter lets
+   * the user adjust the file size used for estimating actual output data size. A
+   * factor greater than 1.0 would generate more files than we would expect based
+   * on the on-disk file size. A value less than 1.0 would create fewer files than
+   * we would expect due to the on-disk size.
+   */
+  public static final String COMPRESSION_FACTOR = "compression-factor";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";

Review comment:
       yep will remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706268664



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;

Review comment:
       Sorry this is part of our internal implementation with an additional feature that @aokolnychyi cooked up. We add in a special combiner that allows multiple shuffle tasks to end up writing to the same file, this helped us out when users have an especially large target size but can't handle a shuffle with outputs that large.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706267593



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);

Review comment:
       It's the difference between running a repartition with the Partition Columns included in the ordering or just using the requested sort Columns. We can probably combine the logic here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705565428



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -383,4 +408,31 @@ public int totalGroupCount() {
       return totalGroupCount;
     }
   }
+
+  static class EmptyStruct implements StructLike {
+
+    private static EmptyStruct instance;
+
+    static EmptyStruct instance() {

Review comment:
       For most singletons, we use `public static EmptyStruct get()` and do the initialization at class init time to avoid a race condition:
   
   ```java
     private static final EmptyStruct INSTANCE = new EmptyStruct();
     static EmptyStruct get() {
       return INSTANCE;
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705737236



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")
+            .execute();
+
+    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
+
+    shouldHaveSnapshots(table, 2);
+    shouldHaveACleanCache(table);

Review comment:
       Shouldn't this validate that the table only has 10 files now and that the files are sorted by c2?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r707760178



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")
+          .save(table.name());
+
+      return rewriteCoordinator.fetchNewDataFiles(table, groupID);
+    } finally {
+      manager.removeTasks(table, groupID);
+      rewriteCoordinator.clearRewrite(table, groupID);
+    }
+  }
+
+  protected SparkSession spark() {
+    return this.spark;
+  }
+
+  protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) {
+    Seq<SortOrder> scalaOrder = JavaConverters.asScalaBuffer(Lists.newArrayList(ordering));
+    return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, scalaOrder, plan, conf);

Review comment:
       I meant that our SparkExtensions will still have the Scala version incompatibility. I switched this to array here though just in case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706284605



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;

Review comment:
       Ah, something like keeping files open across tasks in your internal version? Do you plan to remove this from open source, or is this something we need to keep so you don't have too many deltas from master?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706491378



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       I'm not quite sure I understand what you are describing here as a fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705562721



##########
File path: build.gradle
##########
@@ -989,6 +989,11 @@ project(':iceberg-spark3') {
   apply plugin: 'scala'
 
   sourceSets {
+    main {

Review comment:
       Do we need the Java plugin if we are using Scala here? I'm wondering why we were using both in the past. Maybe we only had Scala depending on Java code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#issuecomment-946203193


   Thanks @rdblue for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706502816



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")

Review comment:
       I forgot I set REWRITE ALL to only apply to filtering and not to the group limits, I should make it apply to both




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705741534



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       The issue is for files with the old partitioning, if we have another way of making an empty struct here that's fine. I can't remember why I chose to make a new class here since it was a while ago now.
   
   The core issue is
   Say we have originally have a table which is partitioned Bucket(x, 5) meaning any original data files are written with values of x more or less randomly distributed in our data files. Then our table has a partitioning changed to something like Bucket(x, 10). Worst case scenario is that when we write we end up having to make 10 files for every partition in our original bucketing. This says let's just assume all those files created with old partitioning are best dealt with at the same time, rather than splitting them up using the old partitioning.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705564261



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -383,4 +408,31 @@ public int totalGroupCount() {
       return totalGroupCount;
     }
   }
+
+  static class EmptyStruct implements StructLike {
+
+    private static EmptyStruct instance;
+
+    static EmptyStruct instance() {
+      if (instance == null) {
+        instance = new EmptyStruct();
+      }
+      return instance;
+    }
+
+    @Override
+    public int size() {
+      return 0;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return null;
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+

Review comment:
       Should this throw an exception because you're trying to write to a slot that doesn't exist?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -383,4 +408,31 @@ public int totalGroupCount() {
       return totalGroupCount;
     }
   }
+
+  static class EmptyStruct implements StructLike {
+
+    private static EmptyStruct instance;
+
+    static EmptyStruct instance() {
+      if (instance == null) {
+        instance = new EmptyStruct();
+      }
+      return instance;
+    }
+
+    @Override
+    public int size() {
+      return 0;
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return null;

Review comment:
       Should this throw an exception?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706495504



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       `StructLike` makes no guarantees about `equals`/`hashCode` behavior, so using it as a map key is like using a `CharSequence` as a map key. Probably not a good idea because it will break if the underlying implementation changes or differs. I'd recommend using `StructLikeMap` that handles consistent behavior. But that requires using a specific struct type. Since you really only need the table spec's struct type, you can use that. Then keep any tasks that are not in the current table spec in a list, like this:
   
   ```java
   StructLikeMap<FileScanTask> filesByPartition = StructLikeMap.create(table.spec().partitionType());
   List<FileScanTask> tasksFromOtherSpecs = Lists.newArrayList();
   Streams.stream(fileScanTasks).forEach(task -> {
       if (task.file().specId() != table.spec().specId()) {
         tasksFromOtherSpecs.add(task);
       } else {
         filesByPartition.put(task.file().partition(), task);
       }
     });
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r707730166



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")
+          .save(table.name());
+
+      return rewriteCoordinator.fetchNewDataFiles(table, groupID);
+    } finally {
+      manager.removeTasks(table, groupID);
+      rewriteCoordinator.clearRewrite(table, groupID);
+    }
+  }
+
+  protected SparkSession spark() {
+    return this.spark;
+  }
+
+  protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) {
+    Seq<SortOrder> scalaOrder = JavaConverters.asScalaBuffer(Lists.newArrayList(ordering));
+    return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, scalaOrder, plan, conf);

Review comment:
       We are using this as Seq in our Scala code, I can change the API to always use array but I'm not sure that helps us if we have other library code using Seq?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731130394



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -100,12 +105,29 @@ protected Table table() {
    */
   protected abstract BinPackStrategy binPackStrategy();
 
+  /**
+   * The framework specific {@link SortStrategy}
+   */
+  protected abstract SortStrategy sortStrategy();
+
   @Override
   public RewriteDataFiles binPack() {
     this.strategy = binPackStrategy();
     return this;
   }
 
+  @Override
+  public RewriteDataFiles sort(SortOrder sortOrder) {
+    this.strategy = sortStrategy().sortOrder(sortOrder);

Review comment:
       Should we add a check that this is only replacing the default `binPack` strategy? Right now, we would allow calling `binPack().sort(SortOrder...build()).execuite()` which would be strange.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731319773



##########
File path: spark/v3.0/build.gradle
##########
@@ -20,6 +20,13 @@
 project(':iceberg-spark:iceberg-spark3') {
   apply plugin: 'scala'
 
+  sourceSets {
+    main {
+      scala.srcDirs = ['src/main/scala', 'src/main/java']
+      java.srcDirs = []
+    }
+  }
+

Review comment:
       It only builds fine if you aren't accessing Scala code from Java code. It works fine in the other direction, see 
   https://github.com/apache/iceberg/pull/2829#discussion_r705519520




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705741920



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")

Review comment:
       I guess this isn't needed now that I added REWRITE_ALL, This can be removed now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705749012



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")
+          .save(table.name());
+
+      return rewriteCoordinator.fetchNewDataFiles(table, groupID);
+    } finally {
+      manager.removeTasks(table, groupID);
+      rewriteCoordinator.clearRewrite(table, groupID);
+    }
+  }
+
+  protected SparkSession spark() {
+    return this.spark;
+  }
+
+  protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) {
+    Seq<SortOrder> scalaOrder = JavaConverters.asScalaBuffer(Lists.newArrayList(ordering));
+    return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, scalaOrder, plan, conf);

Review comment:
       Is it possible to use a List or Array for order rather than creating a Seq and passing that? Using Seq in our APIs can cause compatibility problems across Scala versions. It would be better to pass everything as an Array.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706503349



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       Yeah, you may need to pass an empty struct through the rest of the code. But it is probably a good idea not to use StructLike as a map key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705567902



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")
+          .save(table.name());
+
+      return rewriteCoordinator.fetchNewDataFiles(table, groupID);
+    } finally {
+      manager.removeTasks(table, groupID);
+      rewriteCoordinator.clearRewrite(table, groupID);
+    }
+  }
+
+  protected SparkSession spark() {
+    return this.spark;
+  }
+
+  protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, long numOutputFiles,
+      LogicalPlan plan, SQLConf conf) {

Review comment:
       We'd normally wrap the line to start at the same position as `Distribution`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706289100



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;

Review comment:
       I think we want to bring in the combiner to OSS as well but I believe it requires some additional catalyst extending. I'll remove it for now and @aokolnychyi and I can discuss whether to bring that feature into OSS. I think the only issue is that we will have to add more Spark Version dependent scala code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer closed pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer closed pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731138629



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  /**
+   * The number of shuffle partitions and consequently the number of output files
+   * created by the Spark Sort is based on the size of the input data files used
+   * in this rewrite operation. Due to compression, the disk file sizes may not
+   * accurately represent the size of files in the output. This parameter lets
+   * the user adjust the file size used for estimating actual output data size. A
+   * factor greater than 1.0 would generate more files than we would expect based
+   * on the on-disk file size. A value less than 1.0 would create fewer files than
+   * we would expect due to the on-disk size.
+   */
+  public static final String COMPRESSION_FACTOR = "compression-factor";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(COMPRESSION_FACTOR)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        COMPRESSION_FACTOR,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Invalid compression factor: %s (not positive)", sizeEstimateMultiple);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering = Spark3Util.convert(sortOrder());
+    Distribution distribution;
+
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),

Review comment:
       This is going to use the table order if it is RANGE partitioned and has a table ordering. That will override the given `ordering` from the action config.
   
   I think you should do something similar to `Spark3Util.buildRequiredDistribution` but pass around the action's `sortOrder()` rather than using the table's sort order. Also, the mode would always be range, so you can simplify it to just this:
   
   ```java
   ordering = Spark3Util.convert(SortOrderUtil.buildSortOrder(table.schema(), table.spec(), sortOrder()))
   distribution = Distribution.ordered(ordering);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731275234



##########
File path: spark/v3.0/build.gradle
##########
@@ -20,6 +20,13 @@
 project(':iceberg-spark:iceberg-spark3') {
   apply plugin: 'scala'
 
+  sourceSets {
+    main {
+      scala.srcDirs = ['src/main/scala', 'src/main/java']
+      java.srcDirs = []
+    }
+  }
+

Review comment:
       Why is this needed now? I thought that everything built just fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731368710



##########
File path: spark/v3.0/build.gradle
##########
@@ -20,6 +20,13 @@
 project(':iceberg-spark:iceberg-spark3') {
   apply plugin: 'scala'
 
+  sourceSets {
+    main {
+      scala.srcDirs = ['src/main/scala', 'src/main/java']
+      java.srcDirs = []
+    }
+  }
+

Review comment:
       Okay, so this was working fine because we hadn't accessed Scala from Java yet. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706283695



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);

Review comment:
       Why would we not use the partition columns in the sort? It seems to me that we want to always go through the utility methods to build the distribution and sort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705741508



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java
##########
@@ -35,6 +36,11 @@ protected BinPackStrategy binPackStrategy() {
     return new Spark3BinPackStrategy(table(), spark());
   }
 
+  @Override
+  protected SortStrategy sortStrategy() {

Review comment:
       This is a little strange to me. It feels more like a `createSortStrategy()`, but I think it's fine since it isn't exposed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705742453



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java
##########
@@ -35,6 +36,11 @@ protected BinPackStrategy binPackStrategy() {
     return new Spark3BinPackStrategy(table(), spark());
   }
 
+  @Override
+  protected SortStrategy sortStrategy() {

Review comment:
       Yeah this is just copying what we did for binPack ^ but we can change them both, if needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731275586



##########
File path: spark/v3.0/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SortOrderUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  /**
+   * The number of shuffle partitions and consequently the number of output files
+   * created by the Spark Sort is based on the size of the input data files used
+   * in this rewrite operation. Due to compression, the disk file sizes may not
+   * accurately represent the size of files in the output. This parameter lets
+   * the user adjust the file size used for estimating actual output data size. A
+   * factor greater than 1.0 would generate more files than we would expect based
+   * on the on-disk file size. A value less than 1.0 would create fewer files than
+   * we would expect due to the on-disk size.
+   */
+  public static final String COMPRESSION_FACTOR = "compression-factor";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";

Review comment:
       Looks like this isn't used. Can we remove it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705744024



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",

Review comment:
       I think this should be simpler. How about `"Invalid size estimation factor: %s (not positive)"`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r707714559



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";

Review comment:
       I added in a description, we can discuss alternatives with that context. The current difficulty is that at this point in time we know task file sizes and our nob for adjusting output size is the number of shuffle partitions we create. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705564248



##########
File path: build.gradle
##########
@@ -989,6 +989,11 @@ project(':iceberg-spark3') {
   apply plugin: 'scala'
 
   sourceSets {
+    main {

Review comment:
       Yeah the other direction is fine, we don't have the Java plugin set for this project but the Scala plugin does extends the Java one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705567485



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")

Review comment:
       This sounds like it's worth a comment to remind the reader that this doesn't actually append data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705746194



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);

Review comment:
       I don't quite understand the logic here. Won't an ordered distribution result in a repartition as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705519520



##########
File path: build.gradle
##########
@@ -989,6 +989,11 @@ project(':iceberg-spark3') {
   apply plugin: 'scala'
 
   sourceSets {
+    main {

Review comment:
       @rdblue never-mind this IS required if you want to do cross compilation (Have Java code depend on Scala code in the same module)
   
   
   Otherwise it compiles Java without the Scala source before the Scala code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731134604



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";

Review comment:
       Should this constant also be removed then?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731138629



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  /**
+   * The number of shuffle partitions and consequently the number of output files
+   * created by the Spark Sort is based on the size of the input data files used
+   * in this rewrite operation. Due to compression, the disk file sizes may not
+   * accurately represent the size of files in the output. This parameter lets
+   * the user adjust the file size used for estimating actual output data size. A
+   * factor greater than 1.0 would generate more files than we would expect based
+   * on the on-disk file size. A value less than 1.0 would create fewer files than
+   * we would expect due to the on-disk size.
+   */
+  public static final String COMPRESSION_FACTOR = "compression-factor";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(COMPRESSION_FACTOR)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        COMPRESSION_FACTOR,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Invalid compression factor: %s (not positive)", sizeEstimateMultiple);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering = Spark3Util.convert(sortOrder());
+    Distribution distribution;
+
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),

Review comment:
       This is going to use the table order if it is RANGE partitioned and has a table ordering. That will override the given `ordering` from the action config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731237603



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  /**
+   * The number of shuffle partitions and consequently the number of output files
+   * created by the Spark Sort is based on the size of the input data files used
+   * in this rewrite operation. Due to compression, the disk file sizes may not
+   * accurately represent the size of files in the output. This parameter lets
+   * the user adjust the file size used for estimating actual output data size. A
+   * factor greater than 1.0 would generate more files than we would expect based
+   * on the on-disk file size. A value less than 1.0 would create fewer files than
+   * we would expect due to the on-disk size.
+   */
+  public static final String COMPRESSION_FACTOR = "compression-factor";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(COMPRESSION_FACTOR)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        COMPRESSION_FACTOR,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Invalid compression factor: %s (not positive)", sizeEstimateMultiple);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering = Spark3Util.convert(sortOrder());
+    Distribution distribution;
+
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),

Review comment:
       Good call, refactored this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] adamkennedy commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
adamkennedy commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706501267



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";

Review comment:
       For our in-house implementation of this compaction strategy the advanced user bailout approach used "rows per file" which mostly worked but wasn't stable in the face of extensive schema changes (file sized grew with column growth) but was largely stable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706493679



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       Actually I think I see now. Should have this in a bit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706497770



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       The only issue I can think of here is that it doesn't fit well with the rest of the code which assumes all FileGroups have an associated partition value, I think empty is the right thing to pass since we are basically treating those files as unpartitioned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705747420



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;

Review comment:
       Why increase the number of tasks? Won't that cause more output files than `numOutputFiles` to be created?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705745623



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());

Review comment:
       Why not set the default in the variable declaration?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706503451



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")
+            .execute();
+
+    Assert.assertEquals("Should have 10 fileGroups", result.rewriteResults().size(), 10);
+
+    table.refresh();
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
+
+    shouldHaveSnapshots(table, 2);
+    shouldHaveACleanCache(table);

Review comment:
       Ah yeah the result of this would not be completely sorted since it groups together 2 files at a time (pretty much at random) so the results shouldn't be globally sorted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705743410



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";

Review comment:
       Yeah let me work on that naming/documenting. This is from a discussion with @aokolnychyi where we were trying to figure out how to deal with the shuffle size estimation and trying to give users a way to bail out of the automatic sizing. Let me go back to our notes and fix the name here. This is meant to be an advanced user escape valve. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] adamkennedy commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
adamkennedy commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r676984533



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")

Review comment:
       Append? Wouldn't this result in a read query having to read both the sorted version and the original and fold them together, unless a subsequent compaction of the partition occurs, which might not preserve all of the sorting?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705736794



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();
+    int fileSize = averageFileSize(table);
+
+    List<Object[]> originalData = currentData();
+
+    // Perform a rewrite but only allow 2 files to be compacted at a time
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .sort()
+            .option(SortStrategy.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.MAX_FILE_GROUP_SIZE_BYTES, Integer.toString(fileSize * 2 + 100))
+            .option(BinPackStrategy.MIN_INPUT_FILES, "1")

Review comment:
       Why use `BinPackStrategy.MIN_INPUT_FILES` with a sort strategy? Should this be moved or renamed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705737464



##########
File path: spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java
##########
@@ -590,6 +595,156 @@ public void testInvalidOptions() {
             .execute());
   }
 
+  @Test
+  public void testSortMultipleGroups() {
+    Table table = createTable(20);
+    shouldHaveFiles(table, 20);
+    table.replaceSortOrder().asc("c2").commit();

Review comment:
       Why doesn't this use `shouldHaveLastCommitUnsorted(table, "c2")`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706502243



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.get();

Review comment:
       Trying something like
   ```
         StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(table.spec().partitionType());
         StructLike emptyStruct = GenericRecord.create(table.schema());
   
         fileScanTasks.forEach(task -> {
           /*
           If a task uses an incompatible partition spec the data inside could contain values which
           belong to multiple partitions in the current spec. Treating all such files as un-partitioned and
           grouping them together helps to minimize new files made.
           */
           StructLike taskPartition = task.file().specId() == table.spec().specId() ?
               task.file().partition() : emptyStruct;
   
           List<FileScanTask> files = filesByPartition.get(taskPartition);
           if (files == null) {
             files = Lists.newArrayList();
           }
   
           files.add(task);
           filesByPartition.put(taskPartition, files);
         });
         ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r705523413



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
 
     try {
       Map<StructLike, List<FileScanTask>> filesByPartition = Streams.stream(fileScanTasks)
-          .collect(Collectors.groupingBy(task -> task.file().partition()));
+          .collect(Collectors.groupingBy(task -> {
+            if (task.file().specId() == table.spec().specId()) {
+              return task.file().partition();
+            } else {
+              return EmptyStruct.instance();

Review comment:
       Tasks which are comprised of data which is not partitioned according to the curtain spec must treated as if they were not partitioned. We can probably ease this restriction for partitioning that satisfies the current partitioning (Ie Table is set to partition on day but this is an hour partition) but this is the simplest approach for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] adamkennedy commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
adamkennedy commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r676984533



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")

Review comment:
       Append? Wouldn't this result in a read query having to read both the sorted version and the original and fold them out, unless a subsequent compaction occurs, which might not preserve all of the sorting?

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.RewriteStrategy;
+import org.apache.iceberg.actions.SortStrategy;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.iceberg.distributions.Distribution;
+import org.apache.spark.sql.connector.iceberg.distributions.Distributions;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+public class Spark3SortStrategy extends SortStrategy {
+
+  public static final String SIZE_ESTIMATE_MULTIPLE = "size-estimate-multiple";
+
+  public static final String SHUFFLE_TASKS_PER_FILE = "shuffle-tasks-per-file";
+  public static final int SHUFFLE_TASKS_PER_FILE_DEFAULT = 1;
+
+  private final Table table;
+  private final SparkSession spark;
+  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
+
+  private double sizeEstimateMultiple;
+  private int shuffleTasksPerFile;
+
+  public Spark3SortStrategy(Table table, SparkSession spark) {
+    this.table = table;
+    this.spark = spark;
+  }
+
+  @Override
+  public Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(SIZE_ESTIMATE_MULTIPLE)
+        .add(SHUFFLE_TASKS_PER_FILE)
+        .build();
+  }
+
+  @Override
+  public RewriteStrategy options(Map<String, String> options) {
+    sizeEstimateMultiple = PropertyUtil.propertyAsDouble(options,
+        SIZE_ESTIMATE_MULTIPLE,
+        1.0);
+
+    Preconditions.checkArgument(sizeEstimateMultiple > 0,
+        "Cannot use Spark3Sort Strategy without %s being positive, found %s",
+        SIZE_ESTIMATE_MULTIPLE, sizeEstimateMultiple);
+
+    shuffleTasksPerFile = PropertyUtil.propertyAsInt(options,
+        SHUFFLE_TASKS_PER_FILE,
+        SHUFFLE_TASKS_PER_FILE_DEFAULT);
+
+    Preconditions.checkArgument(shuffleTasksPerFile >= 1,
+        "Cannot use Spark3Sort Strategy as option %s must be >= 1, found %s",
+        SHUFFLE_TASKS_PER_FILE, shuffleTasksPerFile);
+
+    return super.options(options);
+  }
+
+  @Override
+  public Set<DataFile> rewriteFiles(List<FileScanTask> filesToRewrite) {
+    String groupID = UUID.randomUUID().toString();
+    boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table.spec());
+    SortOrder[] ordering;
+    Distribution distribution;
+    ordering = Spark3Util.convert(sortOrder());
+    if (requiresRepartition) {
+      distribution = Spark3Util.buildRequiredDistribution(table);
+      ordering = Stream.concat(
+          Arrays.stream(Spark3Util.buildRequiredOrdering(distribution, table())),
+          Arrays.stream(ordering)).toArray(SortOrder[]::new);
+    } else {
+      distribution = Distributions.ordered(ordering);
+    }
+
+    manager.stageTasks(table, groupID, filesToRewrite);
+
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    SparkSession cloneSession = spark.cloneSession();
+    cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
+
+    // Reset Shuffle Partitions for our sort
+    long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
+    long numShufflePartitions = numOutputFiles * shuffleTasksPerFile;
+    cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numShufflePartitions));
+
+    Dataset<Row> scanDF = cloneSession.read().format("iceberg")
+        .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+        .load(table.name());
+
+    // write the packed data into new files where each split becomes a new file
+    try {
+      SQLConf sqlConf = cloneSession.sessionState().conf();
+      LogicalPlan sortPlan = sortPlan(distribution, ordering, numOutputFiles, scanDF.logicalPlan(), sqlConf);
+      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+
+      sortedDf.write()
+          .format("iceberg")
+          .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
+          .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+          .mode("append")

Review comment:
       Append? Wouldn't this result in a read query having to read both the sorted version and the original and fold them together, unless a subsequent compaction occurs, which might not preserve all of the sorting?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#issuecomment-918632089


   @rdblue + @adamkennedy Changes applied, if you have time please take another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer merged pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
RussellSpitzer merged pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2829: Spark3 sort compaction

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r731132040



##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -383,4 +424,28 @@ public int totalGroupCount() {
       return totalGroupCount;
     }
   }
+
+  static class EmptyStruct implements StructLike {

Review comment:
       Looks like this is no longer needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org