You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/08/29 12:46:57 UTC

[1/4] flink git commit: [hotfix][streaming] Fix logging in TwoPhaseCommitSinkFunction

Repository: flink
Updated Branches:
  refs/heads/master 257a5a541 -> 1e2a63874


[hotfix][streaming] Fix logging in TwoPhaseCommitSinkFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e2a6387
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e2a6387
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e2a6387

Branch: refs/heads/master
Commit: 1e2a63874f848fc97b4bceae54fb646c3b128f4f
Parents: 9d9cdcb
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Aug 22 16:06:33 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e2a6387/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 18f74b6..6040979 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -285,7 +285,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 		}
 		// if in restore we didn't get any userContext or we are initializing from scratch
 		if (userContext == null) {
-			LOG.info("{} - no state to restore {}", name());
+			LOG.info("{} - no state to restore", name());
 
 			userContext = initializeUserContext();
 		}


[3/4] flink git commit: [hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction

Posted by al...@apache.org.
[hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction

This allow for some custom user logic during handling checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d9cdcba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d9cdcba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d9cdcba

Branch: refs/heads/master
Commit: 9d9cdcbad6ccf353a1252866f6a56ac505bfaa95
Parents: 959d54f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 14 15:45:45 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../api/functions/sink/TwoPhaseCommitSinkFunction.java          | 4 ++--
 .../api/functions/sink/TwoPhaseCommitSinkFunctionTest.java      | 5 ++++-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 77f74fe..18f74b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -222,7 +222,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 	}
 
 	@Override
-	public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
 		// this is like the pre-commit of a 2-phase-commit transaction
 		// we are ready to commit and remember the transaction
 
@@ -246,7 +246,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 	}
 
 	@Override
-	public final void initializeState(FunctionInitializationContext context) throws Exception {
+	public void initializeState(FunctionInitializationContext context) throws Exception {
 		// when we are restoring state with pendingCommitTransactions, we don't really know whether the
 		// transactions were already committed, or whether there was a failure between
 		// completing the checkpoint on the master, and notifying the writer here.

http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index b9097d7..4715c39 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -155,7 +155,10 @@ public class TwoPhaseCommitSinkFunctionTest {
 		@Override
 		protected void commit(FileTransaction transaction) {
 			try {
-				Files.move(transaction.tmpFile.toPath(), new File(targetDirectory, transaction.tmpFile.getName()).toPath(), ATOMIC_MOVE);
+				Files.move(
+					transaction.tmpFile.toPath(),
+					new File(targetDirectory, transaction.tmpFile.getName()).toPath(),
+					ATOMIC_MOVE);
 			} catch (IOException e) {
 				throw new IllegalStateException(e);
 			}


[2/4] flink git commit: [FLINK-7497][streaming] Introduce user context in TwoPhaseCommitSinkFunction

Posted by al...@apache.org.
[FLINK-7497][streaming] Introduce user context in TwoPhaseCommitSinkFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/959d54fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/959d54fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/959d54fc

Branch: refs/heads/master
Commit: 959d54fc828691759f15f2e83c0c123e9da6e782
Parents: ac72360
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 21 16:53:07 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunction.java        | 57 ++++++++++++++++----
 .../sink/TwoPhaseCommitSinkFunctionTest.java    |  4 +-
 2 files changed, 48 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/959d54fc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index b73272d..77f74fe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -37,6 +37,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -49,22 +50,25 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * @param <IN> Input type for {@link SinkFunction}.
  * @param <TXN> Transaction to store all of the information required to handle a transaction.
+ * @param <CONTEXT> Context that will be shared across all invocations for the given {@link TwoPhaseCommitSinkFunction}
+ *                 instance. Context is created once
  */
 @PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
+public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 		extends RichSinkFunction<IN>
 		implements CheckpointedFunction, CheckpointListener {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-	protected final ListStateDescriptor<State<TXN>> stateDescriptor;
+	protected final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
 
 	protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
 
 	@Nullable
 	protected TXN currentTransaction;
+	protected Optional<CONTEXT> userContext;
 
-	protected ListState<State<TXN>> state;
+	protected ListState<State<TXN, CONTEXT>> state;
 
 	/**
 	 * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this
@@ -77,8 +81,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	 * </pre>
 	 * @param stateTypeInformation {@link TypeInformation} for POJO holding state of opened transactions.
 	 */
-	public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN>> stateTypeInformation) {
-		this(new ListStateDescriptor<State<TXN>>("state", stateTypeInformation));
+	public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN, CONTEXT>> stateTypeInformation) {
+		this(new ListStateDescriptor<State<TXN, CONTEXT>>("state", stateTypeInformation));
 	}
 
 	/**
@@ -86,10 +90,18 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	 *
 	 * @param stateDescriptor descriptor for transactions POJO.
 	 */
-	public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN>> stateDescriptor) {
+	public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor) {
 		this.stateDescriptor = requireNonNull(stateDescriptor, "stateDescriptor is null");
 	}
 
+	protected Optional<CONTEXT> initializeUserContext() {
+		return Optional.empty();
+	}
+
+	protected Optional<CONTEXT> getUserContext() {
+		return userContext;
+	}
+
 	// ------ methods that should be implemented in child class to support two phase commit algorithm ------
 
 	/**
@@ -142,6 +154,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		abort(transaction);
 	}
 
+	protected void finishRecoveringContext() {
+	}
+
 	// ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener} ------
 
 	@Override
@@ -226,7 +241,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		state.clear();
 		state.add(new State<>(
 			this.currentTransaction,
-			new ArrayList<>(pendingCommitTransactions.values())));
+			new ArrayList<>(pendingCommitTransactions.values()),
+			userContext));
 	}
 
 	@Override
@@ -250,7 +266,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		if (context.isRestored()) {
 			LOG.info("{} - restoring state", name());
 
-			for (State<TXN> operatorState : state.get()) {
+			for (State<TXN, CONTEXT> operatorState : state.get()) {
+				userContext = operatorState.getContext();
 				List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions();
 				for (TXN recoveredTransaction : recoveredTransactions) {
 					// If this fails, there is actually a data loss
@@ -260,9 +277,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 
 				recoverAndAbort(operatorState.getPendingTransaction());
 				LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
+
+				if (userContext.isPresent()) {
+					finishRecoveringContext();
+				}
 			}
-		} else {
+		}
+		// if in restore we didn't get any userContext or we are initializing from scratch
+		if (userContext == null) {
 			LOG.info("{} - no state to restore {}", name());
+
+			userContext = initializeUserContext();
 		}
 		this.pendingCommitTransactions.clear();
 
@@ -291,14 +316,16 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	/**
 	 * State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
 	 */
-	public static class State<TXN> {
+	public static class State<TXN, CONTEXT> {
 		protected TXN pendingTransaction;
 		protected List<TXN> pendingCommitTransactions = new ArrayList<>();
+		protected Optional<CONTEXT> context;
 
 		public State() {
 		}
 
-		public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions) {
+		public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions, Optional<CONTEXT> context) {
+			this.context = requireNonNull(context, "context is null");
 			this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null");
 			this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions is null");
 		}
@@ -318,5 +345,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) {
 			this.pendingCommitTransactions = pendingCommitTransactions;
 		}
+
+		public Optional<CONTEXT> getContext() {
+			return context;
+		}
+
+		public void setContext(Optional<CONTEXT> context) {
+			this.context = context;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/959d54fc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index f2fcb96..b9097d7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -120,12 +120,12 @@ public class TwoPhaseCommitSinkFunctionTest {
 		assertEquals(expectedValues, actualValues);
 	}
 
-	private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction> {
+	private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
 		private final File tmpDirectory;
 		private final File targetDirectory;
 
 		public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) {
-			super(TypeInformation.of(new TypeHint<State<FileTransaction>>() {}));
+			super(TypeInformation.of(new TypeHint<State<FileTransaction, Void>>() {}));
 
 			if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
 				throw new IllegalArgumentException();


[4/4] flink git commit: [FLINK-7498][streaming] Bind together state fields of TwoPhaseCommitSinkFunction

Posted by al...@apache.org.
[FLINK-7498][streaming] Bind together state fields of TwoPhaseCommitSinkFunction

Make sure that state fields are coupled together between checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac72360c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac72360c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac72360c

Branch: refs/heads/master
Commit: ac72360cc0e71d6f543d93c9c1f117babaa35799
Parents: 257a5a5
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 21 16:43:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunction.java        | 97 +++++++++++---------
 .../sink/TwoPhaseCommitSinkFunctionTest.java    |  4 +-
 2 files changed, 57 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac72360c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 58532f5..b73272d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -57,15 +57,14 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
 
-	protected final ListStateDescriptor<List<TXN>> pendingCommitTransactionsDescriptor;
-	protected final ListStateDescriptor<TXN> pendingTransactionsDescriptor;
+	protected final ListStateDescriptor<State<TXN>> stateDescriptor;
 
 	protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
 
 	@Nullable
 	protected TXN currentTransaction;
-	protected ListState<TXN> pendingTransactionsState;
-	protected ListState<List<TXN>> pendingCommitTransactionsState;
+
+	protected ListState<State<TXN>> state;
 
 	/**
 	 * Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this
@@ -73,32 +72,22 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 	 * {@link TypeInformation#of(TypeHint)}. Example:
 	 * <pre>
 	 * {@code
-	 * TwoPhaseCommitSinkFunction(
-	 *     TypeInformation.of(TXN.class),
-	 *     TypeInformation.of(new TypeHint<List<TXN>>() {}));
+	 * TwoPhaseCommitSinkFunction(TypeInformation.of(new TypeHint<State<TXN, CONTEXT>>() {}));
 	 * }
 	 * </pre>
-	 * @param txnTypeInformation {@link TypeInformation} for transaction POJO.
-	 * @param txnListTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction.
+	 * @param stateTypeInformation {@link TypeInformation} for POJO holding state of opened transactions.
 	 */
-	public TwoPhaseCommitSinkFunction(
-			TypeInformation<TXN> txnTypeInformation,
-			TypeInformation<List<TXN>> txnListTypeInformation) {
-		this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation),
-			new ListStateDescriptor<>("pendingCommitTransactions", txnListTypeInformation));
+	public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN>> stateTypeInformation) {
+		this(new ListStateDescriptor<State<TXN>>("state", stateTypeInformation));
 	}
 
 	/**
 	 * Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors.
 	 *
-	 * @param pendingTransactionsDescriptor descriptor for transaction POJO.
-	 * @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO.
+	 * @param stateDescriptor descriptor for transactions POJO.
 	 */
-	public TwoPhaseCommitSinkFunction(
-			ListStateDescriptor<TXN> pendingTransactionsDescriptor,
-			ListStateDescriptor<List<TXN>> pendingCommitTransactionsDescriptor) {
-		this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null");
-		this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null");
+	public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN>> stateDescriptor) {
+		this.stateDescriptor = requireNonNull(stateDescriptor, "stateDescriptor is null");
 	}
 
 	// ------ methods that should be implemented in child class to support two phase commit algorithm ------
@@ -195,11 +184,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		// ==> There should never be a case where we have no pending transaction here
 		//
 
-		Iterator<Map.Entry<Long, TXN>> pendingTransactionsIterator = pendingCommitTransactions.entrySet().iterator();
-		checkState(pendingTransactionsIterator.hasNext(), "checkpoint completed, but no transaction pending");
+		Iterator<Map.Entry<Long, TXN>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
+		checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
 
-		while (pendingTransactionsIterator.hasNext()) {
-			Map.Entry<Long, TXN> entry = pendingTransactionsIterator.next();
+		while (pendingTransactionIterator.hasNext()) {
+			Map.Entry<Long, TXN> entry = pendingTransactionIterator.next();
 			Long pendingTransactionCheckpointId = entry.getKey();
 			TXN pendingTransaction = entry.getValue();
 			if (pendingTransactionCheckpointId > checkpointId) {
@@ -213,7 +202,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 
 			LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
 
-			pendingTransactionsIterator.remove();
+			pendingTransactionIterator.remove();
 		}
 	}
 
@@ -234,13 +223,10 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		currentTransaction = beginTransaction();
 		LOG.debug("{} - started new transaction '{}'", name(), currentTransaction);
 
-		pendingCommitTransactionsState.clear();
-		pendingCommitTransactionsState.add(new ArrayList<>(pendingCommitTransactions.values()));
-
-		pendingTransactionsState.clear();
-		// in case of failure we might not be able to abort currentTransaction. Let's store it into the state
-		// so it can be aborted after a restart/crash
-		pendingTransactionsState.add(currentTransaction);
+		state.clear();
+		state.add(new State<>(
+			this.currentTransaction,
+			new ArrayList<>(pendingCommitTransactions.values())));
 	}
 
 	@Override
@@ -259,24 +245,21 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 		// we can have more than one transaction to check in case of a scale-in event, or
 		// for the reasons discussed in the 'notifyCheckpointComplete()' method.
 
-		pendingTransactionsState = context.getOperatorStateStore().getListState(pendingTransactionsDescriptor);
-		pendingCommitTransactionsState = context.getOperatorStateStore().getListState(pendingCommitTransactionsDescriptor);
+		state = context.getOperatorStateStore().getListState(stateDescriptor);
 
 		if (context.isRestored()) {
 			LOG.info("{} - restoring state", name());
 
-			for (List<TXN> recoveredTransactions : pendingCommitTransactionsState.get()) {
+			for (State<TXN> operatorState : state.get()) {
+				List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions();
 				for (TXN recoveredTransaction : recoveredTransactions) {
 					// If this fails, there is actually a data loss
 					recoverAndCommit(recoveredTransaction);
 					LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
 				}
-			}
 
-			// Explicitly abort transactions that could be not closed cleanly
-			for (TXN pendingTransaction : pendingTransactionsState.get()) {
-				recoverAndAbort(pendingTransaction);
-				LOG.info("{} aborted recovered transaction {}", name(), pendingTransaction);
+				recoverAndAbort(operatorState.getPendingTransaction());
+				LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
 			}
 		} else {
 			LOG.info("{} - no state to restore {}", name());
@@ -304,4 +287,36 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
 			getRuntimeContext().getIndexOfThisSubtask(),
 			getRuntimeContext().getNumberOfParallelSubtasks());
 	}
+
+	/**
+	 * State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
+	 */
+	public static class State<TXN> {
+		protected TXN pendingTransaction;
+		protected List<TXN> pendingCommitTransactions = new ArrayList<>();
+
+		public State() {
+		}
+
+		public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions) {
+			this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null");
+			this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions is null");
+		}
+
+		public TXN getPendingTransaction() {
+			return pendingTransaction;
+		}
+
+		public void setPendingTransaction(TXN pendingTransaction) {
+			this.pendingTransaction = pendingTransaction;
+		}
+
+		public List<TXN> getPendingCommitTransactions() {
+			return pendingCommitTransactions;
+		}
+
+		public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) {
+			this.pendingCommitTransactions = pendingCommitTransactions;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac72360c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 9d01e74..f2fcb96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -125,9 +125,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 		private final File targetDirectory;
 
 		public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) {
-			super(
-				TypeInformation.of(FileTransaction.class),
-				TypeInformation.of(new TypeHint<List<FileTransaction>>() {}));
+			super(TypeInformation.of(new TypeHint<State<FileTransaction>>() {}));
 
 			if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
 				throw new IllegalArgumentException();