You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2020/10/28 04:19:39 UTC
[iceberg] branch master updated: Core: Extract the
BaseRewriteDataFilesAction for implementing both flink and spark rewrite
actions.
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new da9a0b2 Core: Extract the BaseRewriteDataFilesAction for implementing both flink and spark rewrite actions.
da9a0b2 is described below
commit da9a0b24f21a4390a5270d94c576b62f3b49caec
Author: JunZhang <zh...@126.com>
AuthorDate: Wed Oct 28 12:19:30 2020 +0800
Core: Extract the BaseRewriteDataFilesAction for implementing both flink and spark rewrite actions.
---
.../java/org/apache/iceberg/actions/Action.java | 0
.../org/apache/iceberg/actions/BaseAction.java | 89 ++++++++
.../actions/BaseRewriteDataFilesAction.java | 103 +++++----
.../iceberg/actions/BaseSnapshotUpdateAction.java | 0
.../actions/RewriteDataFilesActionResult.java | 0
.../iceberg/actions/SnapshotUpdateAction.java | 0
.../{BaseAction.java => BaseSparkAction.java} | 60 +-----
.../iceberg/actions/ExpireSnapshotsAction.java | 2 +-
.../iceberg/actions/RemoveOrphanFilesAction.java | 2 +-
.../iceberg/actions/RewriteDataFilesAction.java | 237 +--------------------
10 files changed, 150 insertions(+), 343 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/actions/Action.java b/core/src/main/java/org/apache/iceberg/actions/Action.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/Action.java
rename to core/src/main/java/org/apache/iceberg/actions/Action.java
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
new file mode 100644
index 0000000..cc6c5fe
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseAction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+abstract class BaseAction<R> implements Action<R> {
+
+ protected abstract Table table();
+
+ protected String metadataTableName(MetadataTableType type) {
+ return metadataTableName(table().name(), type);
+ }
+
+ protected String metadataTableName(String tableName, MetadataTableType type) {
+ if (tableName.contains("/")) {
+ return tableName + "#" + type;
+ } else if (tableName.startsWith("hadoop.")) {
+ // for HadoopCatalog tables, use the table location to load the metadata table
+ // because IcebergCatalog uses HiveCatalog when the table is identified by name
+ return table().location() + "#" + type;
+ } else if (tableName.startsWith("hive.")) {
+ // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
+ return tableName.replaceFirst("hive\\.", "") + "." + type;
+ } else {
+ return tableName + "." + type;
+ }
+ }
+
+ /**
+ * Returns all the path locations of all Manifest Lists for a given list of snapshots
+ *
+ * @param snapshots snapshots
+ * @return the paths of the Manifest Lists
+ */
+ protected List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
+ List<String> manifestLists = Lists.newArrayList();
+ for (Snapshot snapshot : snapshots) {
+ String manifestListLocation = snapshot.manifestListLocation();
+ if (manifestListLocation != null) {
+ manifestLists.add(manifestListLocation);
+ }
+ }
+ return manifestLists;
+ }
+
+ /**
+ * Returns all Metadata file paths which may not be in the current metadata. Specifically this includes "version-hint"
+ * files as well as entries in metadata.previousFiles.
+ *
+ * @param ops TableOperations for the table we will be getting paths from
+ * @return a list of paths to metadata files
+ */
+ protected List<String> getOtherMetadataFilePaths(TableOperations ops) {
+ List<String> otherMetadataFiles = Lists.newArrayList();
+ otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
+
+ TableMetadata metadata = ops.current();
+ otherMetadataFiles.add(metadata.metadataFileLocation());
+ for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) {
+ otherMetadataFiles.add(previousMetadataFile.file());
+ }
+ return otherMetadataFiles;
+ }
+
+}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
similarity index 79%
copy from spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
copy to core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
index e1d179a..df547f5 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
@@ -45,42 +45,33 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
-import org.apache.iceberg.spark.SparkUtil;
-import org.apache.iceberg.spark.source.RowDataRewriter;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RewriteDataFilesAction
- extends BaseSnapshotUpdateAction<RewriteDataFilesAction, RewriteDataFilesActionResult> {
+public abstract class BaseRewriteDataFilesAction<ThisT>
+ extends BaseSnapshotUpdateAction<ThisT, RewriteDataFilesActionResult> {
- private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(BaseRewriteDataFilesAction.class);
- private final JavaSparkContext sparkContext;
private final Table table;
private final FileIO fileIO;
private final EncryptionManager encryptionManager;
- private final boolean caseSensitive;
+ private boolean caseSensitive;
+ private PartitionSpec spec;
+ private Expression filter;
private long targetSizeInBytes;
private int splitLookback;
private long splitOpenFileCost;
- private PartitionSpec spec = null;
- private Expression filter;
-
- RewriteDataFilesAction(SparkSession spark, Table table) {
- this.sparkContext = new JavaSparkContext(spark.sparkContext());
+ public BaseRewriteDataFilesAction(Table table) {
this.table = table;
this.spec = table.spec();
this.filter = Expressions.alwaysTrue();
- this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
+ this.caseSensitive = false;
long splitSize = PropertyUtil.propertyAsLong(
table.properties(),
@@ -101,27 +92,41 @@ public class RewriteDataFilesAction
TableProperties.SPLIT_OPEN_FILE_COST,
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
- this.fileIO = SparkUtil.serializableFileIO(table);
+ this.fileIO = fileIO();
this.encryptionManager = table.encryption();
}
@Override
- protected RewriteDataFilesAction self() {
- return this;
- }
-
- @Override
protected Table table() {
return table;
}
+ protected EncryptionManager encryptionManager() {
+ return encryptionManager;
+ }
+
+ protected boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ /**
+ * Is it case sensitive
+ *
+ * @param newCaseSensitive caseSensitive
+ * @return this for method chaining
+ */
+ public BaseRewriteDataFilesAction<ThisT> caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
/**
* Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
*
* @param specId PartitionSpec id to rewrite
* @return this for method chaining
*/
- public RewriteDataFilesAction outputSpecId(int specId) {
+ public BaseRewriteDataFilesAction<ThisT> outputSpecId(int specId) {
Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
this.spec = table.specs().get(specId);
return this;
@@ -133,7 +138,7 @@ public class RewriteDataFilesAction
* @param targetSize size in bytes of rewrite data file
* @return this for method chaining
*/
- public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
+ public BaseRewriteDataFilesAction<ThisT> targetSizeInBytes(long targetSize) {
Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
targetSize);
this.targetSizeInBytes = targetSize;
@@ -141,9 +146,9 @@ public class RewriteDataFilesAction
}
/**
- * Specify the number of "bins" considered when trying to pack the next file split into a task.
- * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
- * task with extra planning cost.
+ * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
+ * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
+ * planning cost.
* <p>
* This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
* metadata, user can use a lookback of 1.
@@ -151,7 +156,7 @@ public class RewriteDataFilesAction
* @param lookback number of "bins" considered when trying to pack the next file split into a task.
* @return this for method chaining
*/
- public RewriteDataFilesAction splitLookback(int lookback) {
+ public BaseRewriteDataFilesAction<ThisT> splitLookback(int lookback) {
Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
this.splitLookback = lookback;
return this;
@@ -161,13 +166,13 @@ public class RewriteDataFilesAction
* Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
* threshold, Iceberg will use this value to do count.
* <p>
- * this configuration controls the number of files to compact for each task, small value would lead to a
- * high compaction, the default value is 4MB.
+ * this configuration controls the number of files to compact for each task, small value would lead to a high
+ * compaction, the default value is 4MB.
*
* @param openFileCost minimum file size to count to pack into one "bin".
* @return this for method chaining
*/
- public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
+ public BaseRewriteDataFilesAction<ThisT> splitOpenFileCost(long openFileCost) {
Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
this.splitOpenFileCost = openFileCost;
return this;
@@ -180,7 +185,7 @@ public class RewriteDataFilesAction
* @param expr Expression to filter out DataFiles
* @return this for method chaining
*/
- public RewriteDataFilesAction filter(Expression expr) {
+ public BaseRewriteDataFilesAction<ThisT> filter(Expression expr) {
this.filter = Expressions.and(filter, expr);
return this;
}
@@ -213,7 +218,6 @@ public class RewriteDataFilesAction
if (filteredGroupedTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
-
// Split and combine tasks under each partition
List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
.map(scanTasks -> {
@@ -224,14 +228,7 @@ public class RewriteDataFilesAction
.flatMap(Streams::stream)
.collect(Collectors.toList());
- JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-
- Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
- Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
-
- RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
-
- List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
+ List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
.flatMap(tasks -> tasks.stream().map(FileScanTask::file))
.collect(Collectors.toList());
@@ -240,25 +237,19 @@ public class RewriteDataFilesAction
return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
}
+
private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
CloseableIterator<FileScanTask> tasksIter) {
ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
Maps.newHashMap(), Lists::newArrayList);
-
- try {
- tasksIter.forEachRemaining(task -> {
+ try (CloseableIterator<FileScanTask> iterator = tasksIter) {
+ iterator.forEachRemaining(task -> {
StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
tasksGroupedByPartition.put(structLike, task);
});
-
- } finally {
- try {
- tasksIter.close();
- } catch (IOException ioe) {
- LOG.warn("Failed to close task iterator", ioe);
- }
+ } catch (IOException e) {
+ LOG.warn("Failed to close task iterator", e);
}
-
return tasksGroupedByPartition.asMap();
}
@@ -267,15 +258,17 @@ public class RewriteDataFilesAction
RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
commit(rewriteFiles);
-
} catch (Exception e) {
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
-
throw e;
}
}
+
+ protected abstract FileIO fileIO();
+
+ protected abstract List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTask);
}
diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java
rename to core/src/main/java/org/apache/iceberg/actions/BaseSnapshotUpdateAction.java
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
rename to core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesActionResult.java
diff --git a/spark/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java b/core/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java
rename to core/src/main/java/org/apache/iceberg/actions/SnapshotUpdateAction.java
diff --git a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java b/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java
similarity index 64%
rename from spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
rename to spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java
index e5898aa..dfbb59a 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/BaseAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/BaseSparkAction.java
@@ -24,14 +24,11 @@ import java.util.List;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.io.ClosingIterator;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -41,62 +38,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-abstract class BaseAction<R> implements Action<R> {
-
- protected abstract Table table();
-
- protected String metadataTableName(MetadataTableType type) {
- return metadataTableName(table().name(), type);
- }
-
- protected String metadataTableName(String tableName, MetadataTableType type) {
- if (tableName.contains("/")) {
- return tableName + "#" + type;
- } else if (tableName.startsWith("hadoop.")) {
- // for HadoopCatalog tables, use the table location to load the metadata table
- // because IcebergCatalog uses HiveCatalog when the table is identified by name
- return table().location() + "#" + type;
- } else if (tableName.startsWith("hive.")) {
- // HiveCatalog prepend a logical name which we need to drop for Spark 2.4
- return tableName.replaceFirst("hive\\.", "") + "." + type;
- } else {
- return tableName + "." + type;
- }
- }
-
- /**
- * Returns all the path locations of all Manifest Lists for a given list of snapshots
- * @param snapshots snapshots
- * @return the paths of the Manifest Lists
- */
- private List<String> getManifestListPaths(Iterable<Snapshot> snapshots) {
- List<String> manifestLists = Lists.newArrayList();
- for (Snapshot snapshot : snapshots) {
- String manifestListLocation = snapshot.manifestListLocation();
- if (manifestListLocation != null) {
- manifestLists.add(manifestListLocation);
- }
- }
- return manifestLists;
- }
-
- /**
- * Returns all Metadata file paths which may not be in the current metadata. Specifically
- * this includes "version-hint" files as well as entries in metadata.previousFiles.
- * @param ops TableOperations for the table we will be getting paths from
- * @return a list of paths to metadata files
- */
- private List<String> getOtherMetadataFilePaths(TableOperations ops) {
- List<String> otherMetadataFiles = Lists.newArrayList();
- otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text"));
-
- TableMetadata metadata = ops.current();
- otherMetadataFiles.add(metadata.metadataFileLocation());
- for (TableMetadata.MetadataLogEntry previousMetadataFile : metadata.previousFiles()) {
- otherMetadataFiles.add(previousMetadataFile.file());
- }
- return otherMetadataFiles;
- }
+abstract class BaseSparkAction<R> extends BaseAction<R> implements Action<R> {
protected Dataset<Row> buildValidDataFileDF(SparkSession spark) {
return buildValidDataFileDF(spark, table().name());
diff --git a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
index ec759ef..0a08d7a 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsAction.java
@@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
* are issued. Deletes are still performed locally after retrieving the results from the Spark executors.
*/
@SuppressWarnings("UnnecessaryAnonymousClass")
-public class ExpireSnapshotsAction extends BaseAction<ExpireSnapshotsActionResult> {
+public class ExpireSnapshotsAction extends BaseSparkAction<ExpireSnapshotsActionResult> {
private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsAction.class);
private static final String DATA_FILE = "Data File";
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
index 13684fa..c38fe84 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RemoveOrphanFilesAction.java
@@ -67,7 +67,7 @@ import org.slf4j.LoggerFactory;
* <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
* the state of the table if another operation is writing at the same time.
*/
-public class RemoveOrphanFilesAction extends BaseAction<List<String>> {
+public class RemoveOrphanFilesAction extends BaseSparkAction<List<String>> {
private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
private static final UserDefinedFunction filename = functions.udf((String path) -> {
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index e1d179a..b22da52 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -19,90 +19,27 @@
package org.apache.iceberg.actions;
-import java.io.IOException;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.source.RowDataRewriter;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.StructLikeWrapper;
-import org.apache.iceberg.util.TableScanUtil;
-import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class RewriteDataFilesAction
- extends BaseSnapshotUpdateAction<RewriteDataFilesAction, RewriteDataFilesActionResult> {
-
- private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesAction.class);
+ extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {
private final JavaSparkContext sparkContext;
- private final Table table;
- private final FileIO fileIO;
- private final EncryptionManager encryptionManager;
- private final boolean caseSensitive;
- private long targetSizeInBytes;
- private int splitLookback;
- private long splitOpenFileCost;
-
- private PartitionSpec spec = null;
- private Expression filter;
RewriteDataFilesAction(SparkSession spark, Table table) {
+ super(table);
this.sparkContext = new JavaSparkContext(spark.sparkContext());
- this.table = table;
- this.spec = table.spec();
- this.filter = Expressions.alwaysTrue();
- this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive", "false"));
-
- long splitSize = PropertyUtil.propertyAsLong(
- table.properties(),
- TableProperties.SPLIT_SIZE,
- TableProperties.SPLIT_SIZE_DEFAULT);
- long targetFileSize = PropertyUtil.propertyAsLong(
- table.properties(),
- TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
- TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
-
- this.splitLookback = PropertyUtil.propertyAsInt(
- table.properties(),
- TableProperties.SPLIT_LOOKBACK,
- TableProperties.SPLIT_LOOKBACK_DEFAULT);
- this.splitOpenFileCost = PropertyUtil.propertyAsLong(
- table.properties(),
- TableProperties.SPLIT_OPEN_FILE_COST,
- TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
-
- this.fileIO = SparkUtil.serializableFileIO(table);
- this.encryptionManager = table.encryption();
}
@Override
@@ -111,171 +48,17 @@ public class RewriteDataFilesAction
}
@Override
- protected Table table() {
- return table;
- }
-
- /**
- * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
- *
- * @param specId PartitionSpec id to rewrite
- * @return this for method chaining
- */
- public RewriteDataFilesAction outputSpecId(int specId) {
- Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid spec id %d", specId);
- this.spec = table.specs().get(specId);
- return this;
- }
-
- /**
- * Specify the target rewrite data file size in bytes
- *
- * @param targetSize size in bytes of rewrite data file
- * @return this for method chaining
- */
- public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
- Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data file size in bytes %d",
- targetSize);
- this.targetSizeInBytes = targetSize;
- return this;
- }
-
- /**
- * Specify the number of "bins" considered when trying to pack the next file split into a task.
- * Increasing this usually makes tasks a bit more even by considering more ways to pack file regions into a single
- * task with extra planning cost.
- * <p>
- * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
- * metadata, user can use a lookback of 1.
- *
- * @param lookback number of "bins" considered when trying to pack the next file split into a task.
- * @return this for method chaining
- */
- public RewriteDataFilesAction splitLookback(int lookback) {
- Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d", lookback);
- this.splitLookback = lookback;
- return this;
- }
-
- /**
- * Specify the minimum file size to count to pack into one "bin". If the read file size is smaller than this specified
- * threshold, Iceberg will use this value to do count.
- * <p>
- * this configuration controls the number of files to compact for each task, small value would lead to a
- * high compaction, the default value is 4MB.
- *
- * @param openFileCost minimum file size to count to pack into one "bin".
- * @return this for method chaining
- */
- public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
- Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost %d", openFileCost);
- this.splitOpenFileCost = openFileCost;
- return this;
- }
-
- /**
- * Pass a row Expression to filter DataFiles to be rewritten. Note that all files that may contain data matching the
- * filter may be rewritten.
- *
- * @param expr Expression to filter out DataFiles
- * @return this for method chaining
- */
- public RewriteDataFilesAction filter(Expression expr) {
- this.filter = Expressions.and(filter, expr);
- return this;
+ protected FileIO fileIO() {
+ return SparkUtil.serializableFileIO(table());
}
@Override
- public RewriteDataFilesActionResult execute() {
- CloseableIterable<FileScanTask> fileScanTasks = null;
- try {
- fileScanTasks = table.newScan()
- .caseSensitive(caseSensitive)
- .ignoreResiduals()
- .filter(filter)
- .planFiles();
- } finally {
- try {
- if (fileScanTasks != null) {
- fileScanTasks.close();
- }
- } catch (IOException ioe) {
- LOG.warn("Failed to close task iterable", ioe);
- }
- }
-
- Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
- Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
- .filter(kv -> kv.getValue().size() > 1)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
- // Nothing to rewrite if there's only one DataFile in each partition.
- if (filteredGroupedTasks.isEmpty()) {
- return RewriteDataFilesActionResult.empty();
- }
-
- // Split and combine tasks under each partition
- List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
- .map(scanTasks -> {
- CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
- CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
- return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
- })
- .flatMap(Streams::stream)
- .collect(Collectors.toList());
-
+ protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) {
JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
-
- Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
- Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
-
- RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
-
- List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
- List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
- .flatMap(tasks -> tasks.stream().map(FileScanTask::file))
- .collect(Collectors.toList());
- replaceDataFiles(currentDataFiles, addedDataFiles);
-
- return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
- }
-
- private Map<StructLikeWrapper, Collection<FileScanTask>> groupTasksByPartition(
- CloseableIterator<FileScanTask> tasksIter) {
- ListMultimap<StructLikeWrapper, FileScanTask> tasksGroupedByPartition = Multimaps.newListMultimap(
- Maps.newHashMap(), Lists::newArrayList);
-
- try {
- tasksIter.forEachRemaining(task -> {
- StructLikeWrapper structLike = StructLikeWrapper.forType(spec.partitionType()).set(task.file().partition());
- tasksGroupedByPartition.put(structLike, task);
- });
-
- } finally {
- try {
- tasksIter.close();
- } catch (IOException ioe) {
- LOG.warn("Failed to close task iterator", ioe);
- }
- }
-
- return tasksGroupedByPartition.asMap();
- }
-
- private void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
- try {
- RewriteFiles rewriteFiles = table.newRewrite();
- rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
- commit(rewriteFiles);
-
- } catch (Exception e) {
- Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
- .run(fileIO::deleteFile);
-
- throw e;
- }
+ Broadcast<FileIO> io = sparkContext.broadcast(fileIO());
+ Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager());
+ RowDataRewriter rowDataRewriter =
+ new RowDataRewriter(table(), table().spec(), caseSensitive(), io, encryption);
+ return rowDataRewriter.rewriteDataForTasks(taskRDD);
}
}