You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/05/14 02:25:36 UTC
[hudi] branch master updated: [HUDI-1900] Always close the file
handle for a flink mini-batch write (#2943)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ad77cf4 [HUDI-1900] Always close the file handle for a flink mini-batch write (#2943)
ad77cf4 is described below
commit ad77cf42ba7d8eab086be1f6d91f465bf29164ed
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri May 14 10:25:18 2021 +0800
[HUDI-1900] Always close the file handle for a flink mini-batch write (#2943)
Close the file handle eagerly to avoid corrupted files as much as
possible.
---
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 13 +-
.../apache/hudi/client/HoodieFlinkWriteClient.java | 19 +-
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 68 +++----
.../java/org/apache/hudi/io/FlinkCreateHandle.java | 79 ++-------
.../apache/hudi/io/FlinkMergeAndReplaceHandle.java | 196 +++++++++++++++++++++
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 92 +++-------
.../java/org/apache/hudi/io/MiniBatchHandle.java | 30 ++--
.../commit/BaseFlinkCommitActionExecutor.java | 58 +++---
.../hudi/table/action/commit/FlinkMergeHelper.java | 22 +--
.../delta/BaseFlinkDeltaCommitActionExecutor.java | 3 -
.../org/apache/hudi/sink/StreamWriteFunction.java | 26 +--
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 5 +-
.../sink/utils/StreamWriteFunctionWrapper.java | 1 +
13 files changed, 338 insertions(+), 274 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index d0778f7..9800749 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -106,9 +106,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
TaskContextSupplier taskContextSupplier) {
+ this(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
+ hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
+ }
+
+ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile) {
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
init(fileId, recordItr);
- init(fileId, partitionPath, hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId).get());
+ init(fileId, partitionPath, baseFile);
}
/**
@@ -173,6 +180,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
}
}
+ protected void setWriteStatusPath() {
+ writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
+ }
+
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
oldFilePath = makeNewFilePath(partitionPath, oldFileName);
newFilePath = makeNewFilePath(partitionPath, newFileName);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 6cbbeec..9ba0961 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -47,6 +47,7 @@ import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkCreateHandle;
+import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
@@ -376,12 +377,10 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
/**
- * Clean the write handles within a checkpoint interval, this operation
- * would close the underneath file handles.
+ * Clean the write handles within a checkpoint interval.
+ * All the handles should have been closed already.
*/
public void cleanHandles() {
- this.bucketToHandles.values()
- .forEach(handle -> ((MiniBatchHandle) handle).finishWrite());
this.bucketToHandles.clear();
}
@@ -414,10 +413,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
+ final String partitionPath = record.getPartitionPath();
if (bucketToHandles.containsKey(fileID)) {
- return bucketToHandles.get(fileID);
+ MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
+ if (lastHandle.shouldReplace()) {
+ HoodieWriteHandle<?, ?, ?, ?> writeHandle = new FlinkMergeAndReplaceHandle<>(
+ config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(),
+ lastHandle.getWritePath());
+ this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
+ return writeHandle;
+ }
}
- final String partitionPath = record.getPartitionPath();
+
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index c859d26..987f335 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -23,36 +23,32 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
- * A {@link HoodieAppendHandle} that supports append write incrementally(mini-batches).
+ * A {@link HoodieAppendHandle} that supports APPEND write incrementally(mini-batches).
*
- * <p>For the first mini-batch, it initialize and set up the next file path to write,
- * but does not close the file writer until all the mini-batches write finish. Each mini-batch
- * data are appended to this handle, the back-up writer may rollover on condition.
+ * <p>For the first mini-batch, it initializes and sets up the next file path to write,
+ * then closes the file writer. The subsequent mini-batches are appended to the same file
+ * through a different append handle with same write file name.
*
- * @param <T> Payload type
- * @param <I> Input type
- * @param <K> Key type
- * @param <O> Output type
+ * <p>The back-up writer may rollover on condition(for e.g, the filesystem does not support append
+ * or the file size hits the configured threshold).
*/
public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG = LoggerFactory.getLogger(FlinkAppendHandle.class);
- private boolean shouldRollover = false;
+ private boolean isClosed = false;
public FlinkAppendHandle(
HoodieWriteConfig config,
@@ -92,49 +88,37 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
return true;
}
- public boolean shouldRollover() {
- return this.shouldRollover;
- }
-
- /**
- * Appends new records into this append handle.
- * @param recordItr The new records iterator
- */
- public void appendNewRecords(Iterator<HoodieRecord<T>> recordItr) {
- this.recordItr = recordItr;
- }
-
@Override
public List<WriteStatus> close() {
- shouldRollover = true;
- // flush any remaining records to disk
- appendDataAndDeleteBlocks(header);
- // need to fix that the incremental write size in bytes may be lost
- List<WriteStatus> ret = new ArrayList<>(statuses);
- statuses.clear();
- return ret;
- }
-
- @Override
- public void finishWrite() {
- LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException e) {
- throw new HoodieUpsertException("Failed to close append handle", e);
+ return super.close();
+ } finally {
+ this.isClosed = true;
}
}
@Override
public void closeGracefully() {
+ if (isClosed) {
+ return;
+ }
try {
- finishWrite();
+ close();
} catch (Throwable throwable) {
// The intermediate log file can still append based on the incremental MERGE semantics,
// there is no need to delete the file.
LOG.warn("Error while trying to dispose the APPEND handle", throwable);
}
}
+
+ @Override
+ public Path getWritePath() {
+ return writer.getLogFile().getPath();
+ }
+
+ @Override
+ public boolean shouldReplace() {
+ // log files can append new data buffer directly
+ return false;
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index fe5bf99..2de6a57 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -23,11 +23,9 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
@@ -36,27 +34,24 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
/**
- * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
+ * A {@link HoodieCreateHandle} that supports CREATE write incrementally(mini-batches).
*
- * <p>For the first mini-batch, it initialize and set up the next file path to write,
- * but does not close the file writer until all the mini-batches write finish. Each mini-batch
- * data are appended to the same file.
+ * <p>For the first mini-batch, it initializes and sets up the next file path to write,
+ * then closes the file writer. The subsequent mini-batches are appended to a file with new name,
+ * the new file would then rename to this file name,
+ * behaves like each mini-batch data are appended to the same file.
*
- * @param <T> Payload type
- * @param <I> Input type
- * @param <K> Key type
- * @param <O> Output type
+ * @see FlinkMergeAndReplaceHandle
*/
public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
- private long lastFileSize = 0L;
- private long totalRecordsWritten = 0L;
+
+ private boolean isClosed = false;
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
@@ -138,57 +133,22 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
return makeNewFilePath(partitionPath, dataFileName);
}
- /**
- * Get the incremental write status. In mini-batch write mode,
- * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches),
- * thus, after a mini-batch append finish, we do not close the underneath writer but return
- * the incremental WriteStatus instead.
- *
- * @return the incremental write status
- */
- private WriteStatus getIncrementalWriteStatus() {
- try {
- setupWriteStatus();
- // reset the write status
- totalRecordsWritten += recordsWritten;
- recordsWritten = 0;
- recordsDeleted = 0;
- insertRecordsWritten = 0;
- timer = new HoodieTimer().startTimer();
- writeStatus.setTotalErrorRecords(0);
- return writeStatus;
- } catch (IOException e) {
- throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
- }
- }
-
@Override
- protected long computeTotalWriteBytes() {
- long fileSizeInBytes = computeFileSizeInBytes();
- long incFileSizeInBytes = fileSizeInBytes - lastFileSize;
- this.lastFileSize = fileSizeInBytes;
- return incFileSizeInBytes;
- }
-
- @Override
- protected long computeFileSizeInBytes() {
- return fileWriter.getBytesWritten();
- }
-
- @Override
- public void finishWrite() {
- LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + totalRecordsWritten);
+ public List<WriteStatus> close() {
try {
- fileWriter.close();
- } catch (IOException e) {
- throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
+ return super.close();
+ } finally {
+ this.isClosed = true;
}
}
@Override
public void closeGracefully() {
+ if (isClosed) {
+ return;
+ }
try {
- finishWrite();
+ close();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the CREATE handle", throwable);
try {
@@ -201,11 +161,8 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
}
}
- /**
- * Performs actions to durably, persist the current changes and returns a WriteStatus object.
- */
@Override
- public List<WriteStatus> close() {
- return Collections.singletonList(getIncrementalWriteStatus());
+ public Path getWritePath() {
+ return path;
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
new file mode 100644
index 0000000..44d630a
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
+ *
+ * <P>This handle is needed from the second mini-batch write for COW data bucket
+ * when the data bucket is written using multiple mini-batches.
+ *
+ * <p>For the incremental data buffer, it initializes and sets up the next file path to write,
+ * then closes the file and rename to the old file name,
+ * behaves like the new data buffer are appended to the old file.
+ */
+public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
+ extends HoodieMergeHandle<T, I, K, O>
+ implements MiniBatchHandle {
+
+ private static final Logger LOG = LogManager.getLogger(FlinkMergeAndReplaceHandle.class);
+
+ private boolean isClosed = false;
+
+ public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
+ TaskContextSupplier taskContextSupplier, Path basePath) {
+ super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier,
+ new HoodieBaseFile(basePath.toString()));
+ // delete invalid data files generated by task retry.
+ if (getAttemptId() > 0) {
+ deleteInvalidDataFile(getAttemptId() - 1);
+ }
+ }
+
+ /**
+ * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
+ * (thus the fs view got the written data files some of which may be invalid),
+ * it goes on with the next round checkpoint(B) write immediately,
+ * if it tries to reuse the last small data bucket(small file) of an invalid data file,
+ * finally, when the coordinator receives the checkpoint success event of checkpoint(A),
+ * the invalid data file would be cleaned,
+ * and this merger got a FileNotFoundException when it close the write file handle.
+ *
+ * <p> To solve, deletes the invalid data file eagerly
+ * so that the invalid file small bucket would never be reused.
+ *
+ * @param lastAttemptId The last attempt ID
+ */
+ private void deleteInvalidDataFile(long lastAttemptId) {
+ final String lastWriteToken = FSUtils.makeWriteToken(getPartitionId(), getStageId(), lastAttemptId);
+ final String lastDataFileName = FSUtils.makeDataFileName(instantTime,
+ lastWriteToken, this.fileId, hoodieTable.getBaseFileExtension());
+ final Path path = makeNewFilePath(partitionPath, lastDataFileName);
+ try {
+ if (fs.exists(path)) {
+ LOG.info("Deleting invalid MERGE and REPLACE base file due to task retry: " + lastDataFileName);
+ fs.delete(path, false);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Error while deleting the MERGE and REPLACE base file due to task retry: " + lastDataFileName, e);
+ }
+ }
+
+ @Override
+ protected void createMarkerFile(String partitionPath, String dataFileName) {
+ // no need to create any marker file for intermediate file.
+ }
+
+ @Override
+ protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
+ // old and new file name expects to be the same.
+ if (!oldFileName.equals(newFileName)) {
+ LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n"
+ + "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n"
+ + "this rarely happens when the checkpoint success event was not received yet\n"
+ + "but the write task flush with new instant time, which does not break the UPSERT semantics");
+ }
+ super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
+ try {
+ int rollNumber = 0;
+ while (fs.exists(newFilePath)) {
+ Path oldPath = newFilePath;
+ newFileName = newFileNameWithRollover(rollNumber++);
+ newFilePath = makeNewFilePath(partitionPath, newFileName);
+ LOG.warn("Duplicate write for MERGE and REPLACE handle with path: " + oldPath + ", rolls over to new path: " + newFilePath);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Checking existing path for merge and replace handle error: " + newFilePath, e);
+ }
+ }
+
+ /**
+ * Use the writeToken + "-" + rollNumber as the new writeToken of a mini-batch write.
+ */
+ protected String newFileNameWithRollover(int rollNumber) {
+ // make the intermediate file as hidden
+ return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber,
+ this.fileId, hoodieTable.getBaseFileExtension());
+ }
+
+ @Override
+ protected void setWriteStatusPath() {
+ // should still report the old file path.
+ writeStatus.getStat().setPath(new Path(config.getBasePath()), oldFilePath);
+ }
+
+ boolean needsUpdateLocation() {
+ // No need to update location for Flink hoodie records because all the records are pre-tagged
+ // with the desired locations.
+ return false;
+ }
+
+ public void finalizeWrite() {
+ // The file visibility should be kept by the configured ConsistencyGuard instance.
+ try {
+ fs.delete(oldFilePath, false);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error while cleaning the old base file: " + oldFilePath, e);
+ }
+ try {
+ fs.rename(newFilePath, oldFilePath);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error while renaming the temporary roll file: "
+ + newFilePath + " to old base file name: " + oldFilePath, e);
+ }
+ }
+
+ @Override
+ public List<WriteStatus> close() {
+ try {
+ List<WriteStatus> writeStatuses = super.close();
+ finalizeWrite();
+ return writeStatuses;
+ } finally {
+ this.isClosed = true;
+ }
+ }
+
+ @Override
+ public void closeGracefully() {
+ if (isClosed) {
+ return;
+ }
+ try {
+ close();
+ } catch (Throwable throwable) {
+ LOG.warn("Error while trying to dispose the MERGE handle", throwable);
+ try {
+ fs.delete(newFilePath, false);
+ LOG.info("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " success!");
+ } catch (IOException e) {
+ // logging a warning and ignore the exception.
+ LOG.warn("Deleting the intermediate MERGE and REPLACE data file: " + newFilePath + " failed", e);
+ }
+ }
+ }
+
+ @Override
+ public Path getWritePath() {
+ return oldFilePath;
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 518ea69..870a6aa 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -23,12 +23,10 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -40,7 +38,7 @@ import java.util.Iterator;
import java.util.List;
/**
- * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers).
+ * A {@link HoodieMergeHandle} that supports MERGE write incrementally(small data buffers).
*
* <p>For a new data buffer, it initialize and set up the next file path to write,
* and closes the file path when the data buffer write finish. When next data buffer
@@ -48,10 +46,7 @@ import java.util.List;
* for a checkpoint round, it renames the last new file path as the desired file name
* (name with the expected file ID).
*
- * @param <T> Payload type
- * @param <I> Input type
- * @param <K> Key type
- * @param <O> Output type
+ * @see FlinkMergeAndReplaceHandle
*/
public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieMergeHandle<T, I, K, O>
@@ -59,15 +54,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class);
- /**
- * Records the current file handles number that rolls over.
- */
- private int rollNumber = 0;
-
- /**
- * Whether the handle should roll over to new, E.G. the handle has written some intermediate files already.
- */
- private volatile boolean shouldRollover = false;
+ private boolean isClosed = false;
/**
* Records the rolled over file paths.
@@ -130,6 +117,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
rolloverPaths = new ArrayList<>();
try {
+ int rollNumber = 0;
while (fs.exists(newFilePath)) {
oldFilePath = newFilePath; // override the old file name
rolloverPaths.add(oldFilePath);
@@ -151,15 +139,15 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
this.fileId, hoodieTable.getBaseFileExtension());
}
- public boolean shouldRollover() {
- return shouldRollover;
- }
-
@Override
public List<WriteStatus> close() {
- List<WriteStatus> writeStatus = super.close();
- this.shouldRollover = true;
- return writeStatus;
+ try {
+ List<WriteStatus> writeStatus = super.close();
+ finalizeWrite();
+ return writeStatus;
+ } finally {
+ this.isClosed = true;
+ }
}
boolean needsUpdateLocation() {
@@ -168,53 +156,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
return false;
}
- /**
- *
- * Rolls over the write handle to prepare for the next batch write.
- *
- * <p>It tweaks the handle state as following:
- *
- * <ul>
- * <li>Increment the {@code rollNumber}</li>
- * <li>Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned</li>
- * <li>Make the last new file path as old</li>
- * <li>Initialize the new file path and file writer</li>
- * </ul>
- *
- * @param newRecordsItr The records iterator to update
- */
- public void rollOver(Iterator<HoodieRecord<T>> newRecordsItr) {
- init(this.fileId, newRecordsItr);
- this.recordsWritten = 0;
- this.recordsDeleted = 0;
- this.updatedRecordsWritten = 0;
- this.insertRecordsWritten = 0;
- this.writeStatus.setTotalErrorRecords(0);
- this.timer = new HoodieTimer().startTimer();
-
- rollNumber += 1;
-
- rolloverPaths.add(newFilePath);
- oldFilePath = newFilePath;
- final String newFileName = newFileNameWithRollover(rollNumber);
- newFilePath = makeNewFilePath(partitionPath, newFileName);
-
- // create the marker file so that the intermediate roll over files
- // of the retry task can be cleaned.
- MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
- markerFiles.createIfNotExists(partitionPath, newFileName, getIOType());
-
- try {
- fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier);
- } catch (IOException e) {
- throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e);
- }
-
- LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
- newFilePath.toString()));
- }
-
- public void finishWrite() {
+ public void finalizeWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath);
if (rolloverPaths.size() == 1) {
@@ -241,8 +183,11 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
@Override
public void closeGracefully() {
+ if (isClosed) {
+ return;
+ }
try {
- finishWrite();
+ close();
} catch (Throwable throwable) {
LOG.warn("Error while trying to dispose the MERGE handle", throwable);
try {
@@ -254,4 +199,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
}
}
}
+
+ @Override
+ public Path getWritePath() {
+ return newFilePath;
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
index 30ac317..5f022d0 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -18,28 +18,36 @@
package org.apache.hudi.io;
+import org.apache.hadoop.fs.Path;
+
/**
* Hoodie write handle that supports write as mini-batch.
*/
public interface MiniBatchHandle {
/**
- * Returns whether the handle should roll over to new,
- * E.G. the handle has written some intermediate data buffer already.
- */
- default boolean shouldRollover() {
- return false;
- }
-
- /**
- * Finish the write of multiple mini-batches. Usually these mini-bathes
- * come from one checkpoint interval.
+ * Finalize the write of one mini-batch. Usually these mini-bathes
+ * come from one checkpoint interval. The file handle may roll over to new name
+ * if the name conflicts, give a chance to clean the intermediate file.
*/
- void finishWrite();
+ default void finalizeWrite() {}
/**
* Close the file handle gracefully, if any error happens during the file handle close,
* clean the file to not left corrupted file.
*/
void closeGracefully();
+
+ /**
+ * Returns the write file path.
+ */
+ Path getWritePath();
+
+ /**
+ * Whether the old write file should be replaced with the same name new file
+ * using content merged with incremental new data batch.
+ */
+ default boolean shouldReplace() {
+ return true;
+ }
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index f3e10e8..5cfd28b 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -32,11 +32,9 @@ import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
-import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
@@ -160,10 +158,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
}
}
- protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
- return Collections.emptyMap();
- }
-
@Override
protected boolean isWorkloadProfileNeeded() {
return true;
@@ -177,26 +171,29 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
BucketType bucketType,
Iterator recordItr) {
try {
- switch (bucketType) {
- case INSERT:
- return handleInsert(fileIdHint, recordItr);
- case UPDATE:
- if (this.writeHandle instanceof HoodieCreateHandle) {
- // During one checkpoint interval, an insert record could also be updated,
- // for example, for an operation sequence of a record:
- // I, U, | U, U
- // - batch1 - | - batch2 -
- // the first batch(batch1) operation triggers an INSERT bucket,
- // the second batch batch2 tries to reuse the same bucket
- // and append instead of UPDATE.
+ if (this.writeHandle instanceof HoodieCreateHandle) {
+ // During one checkpoint interval, an insert record could also be updated,
+ // for example, for an operation sequence of a record:
+ // I, U, | U, U
+ // - batch1 - | - batch2 -
+ // the first batch(batch1) operation triggers an INSERT bucket,
+ // the second batch batch2 tries to reuse the same bucket
+ // and append instead of UPDATE.
+ return handleInsert(fileIdHint, recordItr);
+ } else if (this.writeHandle instanceof HoodieMergeHandle) {
+ return handleUpdate(partitionPath, fileIdHint, recordItr);
+ } else {
+ switch (bucketType) {
+ case INSERT:
return handleInsert(fileIdHint, recordItr);
- }
- return handleUpdate(partitionPath, fileIdHint, recordItr);
- default:
- throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath);
+ case UPDATE:
+ return handleUpdate(partitionPath, fileIdHint, recordItr);
+ default:
+ throw new AssertionError();
+ }
}
} catch (Throwable t) {
- String msg = "Error upserting bucketType " + bucketType + " for partition :" + partitionPath;
+ String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
@@ -212,7 +209,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// these are updates
- HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr);
+ HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, ?>) this.writeHandle;
return handleUpdateInternal(upsertHandle, fileId);
}
@@ -234,19 +231,6 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
- protected FlinkMergeHandle getUpdateHandle(Iterator<HoodieRecord<T>> recordItr) {
- if (table.requireSortedRecords()) {
- throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write");
- } else {
- FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle;
- // add the incremental records.
- if (writeHandle.shouldRollover()) {
- writeHandle.rollOver(recordItr);
- }
- return writeHandle;
- }
- }
-
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
index 5e9d8cb..6391750 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -62,17 +61,14 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
@Override
public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
- HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
- final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
- Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
- HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
- HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
-
+ HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle) throws IOException {
final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
- if (isFirstTimeMerge(mergeHandle)
- && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) {
+
+ final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
+ HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
+ if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
@@ -83,10 +79,11 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
}
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
+ Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
- if (isFirstTimeMerge(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) {
+ if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
readerIterator = reader.getRecordIterator(readSchema);
@@ -114,9 +111,4 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
}
}
}
-
- private static boolean isFirstTimeMerge(HoodieMergeHandle<?, ?, ?, ?> mergeHandle) {
- return mergeHandle instanceof FlinkMergeHandle && !((FlinkMergeHandle) mergeHandle).shouldRollover();
- }
-
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
index 674a8fd..1fe9820 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
@@ -49,9 +49,6 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordP
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
- if (appendHandle.shouldRollover()) {
- appendHandle.appendNewRecords(recordItr);
- }
appendHandle.doAppend();
List<WriteStatus> writeStatuses = appendHandle.close();
return Collections.singletonList(writeStatuses).iterator();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index d9cb389..e35419a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -143,22 +143,6 @@ public class StreamWriteFunction<K, I, O>
private transient TotalSizeTracer tracer;
/**
- * Flag saying whether the write task is waiting for the checkpoint success notification
- * after it finished a checkpoint.
- *
- * <p>The flag is needed because the write task does not block during the waiting time interval,
- * some data buckets still flush out with old instant time. There are two cases that the flush may produce
- * corrupted files if the old instant is committed successfully:
- * 1) the write handle was writing data but interrupted, left a corrupted parquet file;
- * 2) the write handle finished the write but was not closed, left an empty parquet file.
- *
- * <p>To solve, when this flag was set to true, we flush the data buffer with a new instant time = old instant time + 1ms,
- * the new instant time would affect the write file name. The filesystem view does not recognize the file as committed because
- * it always filters the data files based on successful commit time.
- */
- private volatile boolean confirming = false;
-
- /**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
@@ -207,7 +191,7 @@ public class StreamWriteFunction<K, I, O>
@Override
public void notifyCheckpointComplete(long checkpointId) {
- this.confirming = false;
+ this.writeClient.cleanHandles();
}
/**
@@ -416,16 +400,12 @@ public class StreamWriteFunction<K, I, O>
return;
}
- // if we are waiting for the checkpoint notification, shift the write instant time.
- boolean shift = confirming && StreamerUtil.equal(instant, this.currentInstant);
- final String flushInstant = shift ? StreamerUtil.instantTimePlus(instant, 1) : instant;
-
List<HoodieRecord> records = bucket.records;
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
- final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, flushInstant));
+ final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
@@ -473,7 +453,5 @@ public class StreamWriteFunction<K, I, O>
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
- this.writeClient.cleanHandles();
- this.confirming = true;
}
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 7da9801..21fbc5a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -500,9 +500,8 @@ public class TestWriteCopyOnWrite {
Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>();
- expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
- + "id1,par1,id1,Danny,23,1,par1, "
+ // the last 3 lines are merged
+ expected.put("par1", "["
+ "id1,par1,id1,Danny,23,1,par1, "
+ "id1,par1,id1,Danny,23,1,par1]");
return expected;
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5050109..a4b6c16 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -163,6 +163,7 @@ public class StreamWriteFunctionWrapper<I> {
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
coordinator.notifyCheckpointComplete(checkpointId);
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+ this.writeFunction.notifyCheckpointComplete(checkpointId);
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
try {
compactFunctionWrapper.compact(checkpointId);