You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/10/10 01:30:59 UTC
[hudi] branch master updated: [HUDI-2537] Fix metadata table for
flink (#3774)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad63938 [HUDI-2537] Fix metadata table for flink (#3774)
ad63938 is described below
commit ad63938890151c4c13e78acfd448259c3af80f80
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Oct 10 09:30:39 2021 +0800
[HUDI-2537] Fix metadata table for flink (#3774)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 41 +++++++-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 55 +++++++---
.../FlinkHoodieBackedTableMetadataWriter.java | 112 ++++++++-------------
.../org/apache/hudi/table/HoodieFlinkTable.java | 38 +++++++
.../SparkHoodieBackedTableMetadataWriter.java | 40 --------
.../hudi/sink/StreamWriteOperatorCoordinator.java | 15 +--
.../hudi/sink/append/AppendWriteFunction.java | 7 +-
.../java/org/apache/hudi/source/FileIndex.java | 13 ++-
.../sink/TestStreamWriteOperatorCoordinator.java | 14 ++-
.../sink/utils/StreamWriteFunctionWrapper.java | 3 -
.../java/org/apache/hudi/source/TestFileIndex.java | 4 +-
packaging/hudi-flink-bundle/pom.xml | 43 ++++++++
12 files changed, 232 insertions(+), 153 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 19e9d31..ceac9eb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -219,7 +220,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
protected abstract void initialize(HoodieEngineContext engineContext);
- protected void initTableMetadata() {
+ public void initTableMetadata() {
try {
if (this.metadata != null) {
this.metadata.close();
@@ -533,4 +534,42 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* @param instantTime The timestamp to use for the deltacommit.
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
+
+ /**
+ * Perform a compaction on the Metadata Table.
+ *
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
+ *
+ * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
+ * deltacommit.
+ */
+ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) {
+ String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+ .get().getTimestamp();
+ List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+ .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+ if (!pendingInstants.isEmpty()) {
+ LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
+ pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
+ return;
+ }
+
+ // Trigger compaction with suffixes based on the same instant time. This ensures that any future
+ // delta commits synced over will not have an instant time lesser than the last completed instant on the
+ // metadata table.
+ final String compactionInstantTime = latestDeltacommitTime + "001";
+ if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
+ writeClient.compact(compactionInstantTime);
+ }
+ }
+
+ protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime) {
+ // Trigger cleaning with suffixes based on the same instant time. This ensures that any future
+ // delta commits synced over will not have an instant time lesser than the last completed instant on the
+ // metadata table.
+ writeClient.clean(instantTime + "002");
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 669be16..e95b0f8 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -51,6 +50,8 @@ import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
@@ -86,24 +87,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally.
*/
- private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
+ private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
- public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
- this(context, clientConfig, false);
- }
+ /**
+ * Cached metadata writer for coordinator to reuse for each commit.
+ */
+ private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty();
- @Deprecated
- public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
+ public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig);
this.bucketToHandles = new HashMap<>();
}
- @Deprecated
- public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
- Option<EmbeddedTimelineService> timelineService) {
- super(context, writeConfig, timelineService);
- }
-
/**
* Complete changes performed at the given instantTime marker with specified action.
*/
@@ -260,6 +255,24 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// remove the async cleaning
}
+ @Override
+ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+ this.metadataWriterOption.ifPresent(w -> {
+ w.initTableMetadata(); // refresh the timeline
+ w.update(metadata, instantTime);
+ });
+ }
+
+ /**
+ * Initialize the table metadata writer, for e.g, bootstrap the metadata table
+ * from the filesystem if it does not exist.
+ */
+ public void initMetadataWriter() {
+ HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
+ FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+ this.metadataWriterOption = Option.of(metadataWriter);
+ }
+
/**
* Starts async cleaning service for finished commits.
*
@@ -347,6 +360,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+ writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
+ // commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
@@ -381,6 +396,19 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("Clustering is not supported yet");
}
+ private void writeTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ HoodieCommitMetadata commitMetadata,
+ HoodieInstant hoodieInstant) {
+ try {
+ this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
+ // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+ // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+ table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
+ } finally {
+ this.txnManager.endTransaction();
+ }
+ }
+
@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
@@ -478,6 +506,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
+ table.getHoodieView().sync();
return table;
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 18a1960..634eaba 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -20,24 +20,17 @@ package org.apache.hudi.metadata;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
@@ -86,82 +79,61 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
- List<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+ List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);
- try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) {
- writeClient.startCommitWithTime(instantTime);
- writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
+ try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
+ if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
+ // if this is a new commit being applied to metadata for the first time
+ writeClient.startCommitWithTime(instantTime);
+ writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
+ } else {
+ // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
+ // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
+ // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
+ // are upserts to metadata table and so only a new delta commit will be created.
+ // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
+ // already part of completed commit. So, we have to manually remove the completed instant and proceed.
+ // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
+ HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
+ HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
+ metadataMetaClient.reloadActiveTimeline();
+ }
- List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime);
+ List<WriteStatus> statuses = records.size() > 0
+ ? writeClient.upsertPreppedRecords(recordList, instantTime)
+ : Collections.emptyList();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
}
});
+ // flink does not support auto-commit yet, also the auto commit logic is not complete as AbstractHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
- // trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than the last completed instant on the
- // metadata table.
- if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
- writeClient.compact(instantTime + "001");
- }
- writeClient.clean(instantTime + "002");
+
+ // reload timeline
+ metadataMetaClient.reloadActiveTimeline();
+ compactIfNecessary(writeClient, instantTime);
+ doClean(writeClient, instantTime);
}
// Update total size of the metadata and count of base/log files
- metrics.ifPresent(m -> {
- try {
- m.updateSizeMetrics(metadataMetaClient, metadata);
- } catch (HoodieIOException e) {
- LOG.error("Could not publish metadata size metrics", e);
- }
- });
+ metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}
/**
- * Tag each record with the location.
- * <p>
- * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
- * base file.
+ * Tag each record with the location in the given partition.
+ *
+ * The record is tagged with respective file slice's location based on its record key.
*/
- private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
- HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext);
- TableFileSystemView.SliceView fsView = table.getSliceView();
- List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
- .map(FileSlice::getBaseFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
-
- // All the metadata fits within a single base file
- if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
- if (baseFiles.size() > 1) {
- throw new HoodieMetadataException("Multiple base files found in metadata partition");
- }
- }
-
- String fileId;
- String instantTime;
- if (!baseFiles.isEmpty()) {
- fileId = baseFiles.get(0).getFileId();
- instantTime = "U";
- } else {
- // If there is a log file then we can assume that it has the data
- List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
- .map(FileSlice::getLatestLogFile)
- .filter(Option::isPresent)
- .map(Option::get)
- .collect(Collectors.toList());
- if (logFiles.isEmpty()) {
- // No base and log files. All are new inserts
- fileId = FSUtils.createNewFileIdPfx();
- instantTime = "I";
- } else {
- fileId = logFiles.get(0).getFileId();
- instantTime = "U";
- }
- }
-
- return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList());
+ private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
+ List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
+ ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
+
+ return records.stream().map(r -> {
+ FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
+ final String instantTime = slice.isEmpty() ? "I" : "U";
+ r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId()));
+ return r;
+ }).collect(Collectors.toList());
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 3e26025..ce63a2d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -29,14 +29,25 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
import java.util.List;
public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> {
+
+ private boolean isMetadataAvailabilityUpdated = false;
+ private boolean isMetadataTableAvailable;
+
protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
@@ -66,4 +77,31 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config);
}
+
+ /**
+ * Fetch instance of {@link HoodieTableMetadataWriter}.
+ *
+ * @return instance of {@link HoodieTableMetadataWriter}
+ */
+ @Override
+ public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+ synchronized (this) {
+ if (!isMetadataAvailabilityUpdated) {
+ // this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
+ // this is done to avoid repeated calls to fs.exists().
+ try {
+ isMetadataTableAvailable = config.isMetadataTableEnabled()
+ && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
+ } catch (IOException e) {
+ throw new HoodieMetadataException("Checking existence of metadata table failed", e);
+ }
+ isMetadataAvailabilityUpdated = true;
+ }
+ }
+ if (isMetadataTableAvailable) {
+ return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
+ } else {
+ return Option.empty();
+ }
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index f512b8f..3324455 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -41,9 +41,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Collectors;
public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
@@ -130,44 +128,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
}
/**
- * Perform a compaction on the Metadata Table.
- *
- * Cases to be handled:
- * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
- * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
- *
- * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
- * deltacommit.
- */
- private void compactIfNecessary(SparkRDDWriteClient writeClient, String instantTime) {
- String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
- .get().getTimestamp();
- List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
- .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
-
- if (!pendingInstants.isEmpty()) {
- LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
- pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
- return;
- }
-
- // Trigger compaction with suffixes based on the same instant time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than the last completed instant on the
- // metadata table.
- final String compactionInstantTime = latestDeltacommitTime + "001";
- if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
- writeClient.compact(compactionInstantTime);
- }
- }
-
- private void doClean(SparkRDDWriteClient writeClient, String instantTime) {
- // Trigger cleaning with suffixes based on the same instant time. This ensures that any future
- // delta commits synced over will not have an instant time lesser than the last completed instant on the
- // metadata table.
- writeClient.clean(instantTime + "002");
- }
-
- /**
* Tag each record with the location in the given partition.
*
* The record is tagged with respective file slice's location based on its record key.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 26c1595..f2844a6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -127,11 +127,6 @@ public class StreamWriteOperatorCoordinator
private HiveSyncContext hiveSyncContext;
/**
- * A single-thread executor to handle metadata table sync.
- */
- private NonThrownExecutor metadataSyncExecutor;
-
- /**
* The table state.
*/
private transient TableState tableState;
@@ -294,7 +289,7 @@ public class StreamWriteOperatorCoordinator
}
private void initMetadataSync() {
- this.metadataSyncExecutor = new NonThrownExecutor(LOG, true);
+ this.writeClient.initMetadataWriter();
}
private void reset() {
@@ -498,14 +493,6 @@ public class StreamWriteOperatorCoordinator
this.executor = executor;
}
- @VisibleForTesting
- public void setMetadataSyncExecutor(NonThrownExecutor executor) throws Exception {
- if (this.metadataSyncExecutor != null) {
- this.metadataSyncExecutor.close();
- }
- this.metadataSyncExecutor = executor;
- }
-
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 128c030..0279313 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -72,8 +72,6 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushData(false);
- // nullify the write helper for next ckp
- this.writerHelper = null;
}
@Override
@@ -133,5 +131,10 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
.endInput(endInput)
.build();
this.eventGateway.sendEventToCoordinator(event);
+ // nullify the write helper for next ckp
+ this.writerHelper = null;
+ this.writeStatuses.addAll(writeStatus);
+ // blocks flushing until the coordinator starts a new instant
+ this.confirming = true;
}
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 54085eb..07383ef 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -48,10 +49,12 @@ public class FileIndex {
private final Path path;
private final HoodieMetadataConfig metadataConfig;
private List<String> partitionPaths; // cache of partition paths
+ private final boolean tableExists;
private FileIndex(Path path, Configuration conf) {
this.path = path;
this.metadataConfig = metadataConfig(conf);
+ this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf());
}
public static FileIndex instance(Path path, Configuration conf) {
@@ -111,6 +114,9 @@ public class FileIndex {
* Returns all the file statuses under the table base path.
*/
public FileStatus[] getFilesInPartitions() {
+ if (!tableExists) {
+ return new FileStatus[0];
+ }
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/")
@@ -165,8 +171,9 @@ public class FileIndex {
if (this.partitionPaths != null) {
return this.partitionPaths;
}
- this.partitionPaths = FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT,
- metadataConfig, path.toString());
+ this.partitionPaths = this.tableExists
+ ? FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString())
+ : Collections.emptyList();
return this.partitionPaths;
}
@@ -174,7 +181,7 @@ public class FileIndex {
Properties properties = new Properties();
// set up metadata.enabled=true in table DDL to enable metadata listing
- properties.put(HoodieMetadataConfig.ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+ properties.put(HoodieMetadataConfig.ENABLE.key(), conf.getBoolean(FlinkOptions.METADATA_ENABLED));
return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0538714..1fdb5ca 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -181,7 +180,7 @@ public class TestStreamWriteOperatorCoordinator {
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
}
- @Disabled
+ @Test
void testSyncMetadataTable() throws Exception {
// reset
reset();
@@ -193,7 +192,6 @@ public class TestStreamWriteOperatorCoordinator {
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
- coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
@@ -209,7 +207,7 @@ public class TestStreamWriteOperatorCoordinator {
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000"));
// test metadata table compaction
- // write another 4 commits
+ // write another 3 commits
for (int i = 1; i < 4; i++) {
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();
@@ -247,7 +245,13 @@ public class TestStreamWriteOperatorCoordinator {
double failureFraction) {
final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction);
writeStatus.setPartitionPath(partitionPath);
- writeStatus.setStat(new HoodieWriteStat());
+
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setFileId("fileId123");
+ writeStat.setPath("path123");
+
+ writeStatus.setStat(writeStat);
return WriteMetadataEvent.builder()
.taskID(taskId)
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c5d3ec5..6b6bede 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -142,9 +142,6 @@ public class StreamWriteFunctionWrapper<I> {
public void openFunction() throws Exception {
this.coordinator.start();
this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
- if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
- this.coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(coordinatorContext));
- }
toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index 060974d..334df59 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -27,7 +27,6 @@ import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
@@ -90,7 +89,8 @@ public class TestFileIndex {
assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
}
- @Disabled
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
void testFileListingEmptyTable(boolean enableMetadata) {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata);
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index 380199e..120cba3 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -144,6 +144,10 @@
<include>org.apache.flink:flink-sql-connector-hive-2.3.6_${scala.binary.version}</include>
<include>org.apache.hbase:hbase-common</include>
+ <include>org.apache.hbase:hbase-client</include>
+ <include>org.apache.hbase:hbase-server</include>
+ <include>org.apache.hbase:hbase-protocol</include>
+ <include>org.apache.htrace:htrace-core</include>
<include>commons-codec:commons-codec</include>
</includes>
</artifactSet>
@@ -594,6 +598,45 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-protocol</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.htrace</groupId>
+ <artifactId>htrace-core</artifactId>
+ <version>${htrace.version}</version>
+ </dependency>
</dependencies>
<profiles>