You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/08/29 12:46:59 UTC

[3/4] flink git commit: [hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction

[hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction

This allow for some custom user logic during handling checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d9cdcba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d9cdcba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d9cdcba

Branch: refs/heads/master
Commit: 9d9cdcbad6ccf353a1252866f6a56ac505bfaa95
Parents: 959d54f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 14 15:45:45 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../api/functions/sink/TwoPhaseCommitSinkFunction.java          | 4 ++--
 .../api/functions/sink/TwoPhaseCommitSinkFunctionTest.java      | 5 ++++-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 77f74fe..18f74b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -222,7 +222,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 	}
 
 	@Override
-	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
 		// this is like the pre-commit of a 2-phase-commit transaction
 		// we are ready to commit and remember the transaction
 
@@ -246,7 +246,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 	}
 
 	@Override
-	public final void initializeState(FunctionInitializationContext context) throws Exception {
+	public void initializeState(FunctionInitializationContext context) throws Exception {
 		// when we are restoring state with pendingCommitTransactions, we don't really know whether the
 		// transactions were already committed, or whether there was a failure between
 		// completing the checkpoint on the master, and notifying the writer here.

http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index b9097d7..4715c39 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -155,7 +155,10 @@ public class TwoPhaseCommitSinkFunctionTest {
 		@Override
 		protected void commit(FileTransaction transaction) {
 			try {
-				Files.move(transaction.tmpFile.toPath(), new File(targetDirectory, transaction.tmpFile.getName()).toPath(), ATOMIC_MOVE);
+				Files.move(
+					transaction.tmpFile.toPath(),
+					new File(targetDirectory, transaction.tmpFile.getName()).toPath(),
+					ATOMIC_MOVE);
 			} catch (IOException e) {
 				throw new IllegalStateException(e);
 			}