You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/04/29 22:38:55 UTC

[GitHub] [iceberg] yyanyy commented on a change in pull request #2501: API: API For CompactDataFiles and DataCompactionStrategy

yyanyy commented on a change in pull request #2501:
URL: https://github.com/apache/iceberg/pull/2501#discussion_r623443530



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when the entire job has completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  // TODO be set once we have an ENUM in core
+  // String COMPACTION_STRATEGY_DEFAULT;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max-chunk-size-bytes";

Review comment:
       Do we want to define what a "chunk" is here? Sounds like a collection of data files to be processed together (under one spark job?), but it's not super obvious to me by reading this class. If my assumption is correct, then what happens if a single file's file size is above this threshold? I'd imagine we will treat it as a chunk (instead of splitting it into multiple chunks) and I'd hope we can describe it more here. 

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+
+    public Result(Map<CompactionChunkInfo, CompactionChunkResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionChunkInfo, CompactionChunkResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  interface CompactionChunkResult {
+
+    int numFilesAdded();
+
+    int numFilesRemoved();
+  }
+
+  interface CompactionChunkInfo {
+
+    /**
+     * returns which chunk this is out of the total set of chunks for this compaction
+     */
+    int chunkIndex();

Review comment:
       if we do want this, do we want to rename it to `globalIndex`/`globalChunkIndex` then? Also unless we print the total number of chunks we want to process within the same compaction operation somewhere, I think just having the index of individual finished chunks we still don't know how many remaining ones are left? How do we plan to convey how many chunks we will process in total, is the plan to check size of `DataCompactionStrategy.groupFilesIntoChunks()`? 

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when the entire job has completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  // TODO be set once we have an ENUM in core
+  // String COMPACTION_STRATEGY_DEFAULT;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max-chunk-size-bytes";
+  long MAX_CHUNK_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The max number of chunks to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the chunk is determined by the compaction strategy. When running each job chunk will be run
+   * be run independently and asynchronously.

Review comment:
       nit: duplicated "be run"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org