You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/08/29 12:46:59 UTC
[3/4] flink git commit: [hotfix][streaming] Allow to override methods
from TwoPhaseCommitSinkFunction
[hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction
This allow for some custom user logic during handling checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d9cdcba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d9cdcba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d9cdcba
Branch: refs/heads/master
Commit: 9d9cdcbad6ccf353a1252866f6a56ac505bfaa95
Parents: 959d54f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 14 15:45:45 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200
----------------------------------------------------------------------
.../api/functions/sink/TwoPhaseCommitSinkFunction.java | 4 ++--
.../api/functions/sink/TwoPhaseCommitSinkFunctionTest.java | 5 ++++-
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 77f74fe..18f74b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -222,7 +222,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
}
@Override
- public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
@@ -246,7 +246,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
}
@Override
- public final void initializeState(FunctionInitializationContext context) throws Exception {
+ public void initializeState(FunctionInitializationContext context) throws Exception {
// when we are restoring state with pendingCommitTransactions, we don't really know whether the
// transactions were already committed, or whether there was a failure between
// completing the checkpoint on the master, and notifying the writer here.
http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index b9097d7..4715c39 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -155,7 +155,10 @@ public class TwoPhaseCommitSinkFunctionTest {
@Override
protected void commit(FileTransaction transaction) {
try {
- Files.move(transaction.tmpFile.toPath(), new File(targetDirectory, transaction.tmpFile.getName()).toPath(), ATOMIC_MOVE);
+ Files.move(
+ transaction.tmpFile.toPath(),
+ new File(targetDirectory, transaction.tmpFile.getName()).toPath(),
+ ATOMIC_MOVE);
} catch (IOException e) {
throw new IllegalStateException(e);
}