You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/08/29 12:46:57 UTC
[1/4] flink git commit: [hotfix][streaming] Fix logging in
TwoPhaseCommitSinkFunction
Repository: flink
Updated Branches:
refs/heads/master 257a5a541 -> 1e2a63874
[hotfix][streaming] Fix logging in TwoPhaseCommitSinkFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e2a6387
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e2a6387
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e2a6387
Branch: refs/heads/master
Commit: 1e2a63874f848fc97b4bceae54fb646c3b128f4f
Parents: 9d9cdcb
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Aug 22 16:06:33 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200
----------------------------------------------------------------------
.../streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1e2a6387/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 18f74b6..6040979 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -285,7 +285,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
}
// if in restore we didn't get any userContext or we are initializing from scratch
if (userContext == null) {
- LOG.info("{} - no state to restore {}", name());
+ LOG.info("{} - no state to restore", name());
userContext = initializeUserContext();
}
[3/4] flink git commit: [hotfix][streaming] Allow to override methods
from TwoPhaseCommitSinkFunction
Posted by al...@apache.org.
[hotfix][streaming] Allow to override methods from TwoPhaseCommitSinkFunction
This allow for some custom user logic during handling checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d9cdcba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d9cdcba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d9cdcba
Branch: refs/heads/master
Commit: 9d9cdcbad6ccf353a1252866f6a56ac505bfaa95
Parents: 959d54f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 14 15:45:45 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200
----------------------------------------------------------------------
.../api/functions/sink/TwoPhaseCommitSinkFunction.java | 4 ++--
.../api/functions/sink/TwoPhaseCommitSinkFunctionTest.java | 5 ++++-
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 77f74fe..18f74b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -222,7 +222,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
}
@Override
- public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
@@ -246,7 +246,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
}
@Override
- public final void initializeState(FunctionInitializationContext context) throws Exception {
+ public void initializeState(FunctionInitializationContext context) throws Exception {
// when we are restoring state with pendingCommitTransactions, we don't really know whether the
// transactions were already committed, or whether there was a failure between
// completing the checkpoint on the master, and notifying the writer here.
http://git-wip-us.apache.org/repos/asf/flink/blob/9d9cdcba/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index b9097d7..4715c39 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -155,7 +155,10 @@ public class TwoPhaseCommitSinkFunctionTest {
@Override
protected void commit(FileTransaction transaction) {
try {
- Files.move(transaction.tmpFile.toPath(), new File(targetDirectory, transaction.tmpFile.getName()).toPath(), ATOMIC_MOVE);
+ Files.move(
+ transaction.tmpFile.toPath(),
+ new File(targetDirectory, transaction.tmpFile.getName()).toPath(),
+ ATOMIC_MOVE);
} catch (IOException e) {
throw new IllegalStateException(e);
}
[2/4] flink git commit: [FLINK-7497][streaming] Introduce user
context in TwoPhaseCommitSinkFunction
Posted by al...@apache.org.
[FLINK-7497][streaming] Introduce user context in TwoPhaseCommitSinkFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/959d54fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/959d54fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/959d54fc
Branch: refs/heads/master
Commit: 959d54fc828691759f15f2e83c0c123e9da6e782
Parents: ac72360
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 21 16:53:07 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200
----------------------------------------------------------------------
.../sink/TwoPhaseCommitSinkFunction.java | 57 ++++++++++++++++----
.../sink/TwoPhaseCommitSinkFunctionTest.java | 4 +-
2 files changed, 48 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/959d54fc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index b73272d..77f74fe 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -37,6 +37,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -49,22 +50,25 @@ import static org.apache.flink.util.Preconditions.checkState;
*
* @param <IN> Input type for {@link SinkFunction}.
* @param <TXN> Transaction to store all of the information required to handle a transaction.
+ * @param <CONTEXT> Context that will be shared across all invocations for the given {@link TwoPhaseCommitSinkFunction}
+ * instance. Context is created once
*/
@PublicEvolving
-public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
+public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
- protected final ListStateDescriptor<State<TXN>> stateDescriptor;
+ protected final ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor;
protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
@Nullable
protected TXN currentTransaction;
+ protected Optional<CONTEXT> userContext;
- protected ListState<State<TXN>> state;
+ protected ListState<State<TXN, CONTEXT>> state;
/**
* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this
@@ -77,8 +81,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
* </pre>
* @param stateTypeInformation {@link TypeInformation} for POJO holding state of opened transactions.
*/
- public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN>> stateTypeInformation) {
- this(new ListStateDescriptor<State<TXN>>("state", stateTypeInformation));
+ public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN, CONTEXT>> stateTypeInformation) {
+ this(new ListStateDescriptor<State<TXN, CONTEXT>>("state", stateTypeInformation));
}
/**
@@ -86,10 +90,18 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
*
* @param stateDescriptor descriptor for transactions POJO.
*/
- public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN>> stateDescriptor) {
+ public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN, CONTEXT>> stateDescriptor) {
this.stateDescriptor = requireNonNull(stateDescriptor, "stateDescriptor is null");
}
+ protected Optional<CONTEXT> initializeUserContext() {
+ return Optional.empty();
+ }
+
+ protected Optional<CONTEXT> getUserContext() {
+ return userContext;
+ }
+
// ------ methods that should be implemented in child class to support two phase commit algorithm ------
/**
@@ -142,6 +154,9 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
abort(transaction);
}
+ protected void finishRecoveringContext() {
+ }
+
// ------ entry points for above methods implementing {@CheckPointedFunction} and {@CheckpointListener} ------
@Override
@@ -226,7 +241,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
state.clear();
state.add(new State<>(
this.currentTransaction,
- new ArrayList<>(pendingCommitTransactions.values())));
+ new ArrayList<>(pendingCommitTransactions.values()),
+ userContext));
}
@Override
@@ -250,7 +266,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
- for (State<TXN> operatorState : state.get()) {
+ for (State<TXN, CONTEXT> operatorState : state.get()) {
+ userContext = operatorState.getContext();
List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions();
for (TXN recoveredTransaction : recoveredTransactions) {
// If this fails, there is actually a data loss
@@ -260,9 +277,17 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
recoverAndAbort(operatorState.getPendingTransaction());
LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
+
+ if (userContext.isPresent()) {
+ finishRecoveringContext();
+ }
}
- } else {
+ }
+ // if in restore we didn't get any userContext or we are initializing from scratch
+ if (userContext == null) {
LOG.info("{} - no state to restore {}", name());
+
+ userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
@@ -291,14 +316,16 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
/**
* State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
*/
- public static class State<TXN> {
+ public static class State<TXN, CONTEXT> {
protected TXN pendingTransaction;
protected List<TXN> pendingCommitTransactions = new ArrayList<>();
+ protected Optional<CONTEXT> context;
public State() {
}
- public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions) {
+ public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions, Optional<CONTEXT> context) {
+ this.context = requireNonNull(context, "context is null");
this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null");
this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions is null");
}
@@ -318,5 +345,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) {
this.pendingCommitTransactions = pendingCommitTransactions;
}
+
+ public Optional<CONTEXT> getContext() {
+ return context;
+ }
+
+ public void setContext(Optional<CONTEXT> context) {
+ this.context = context;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/959d54fc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index f2fcb96..b9097d7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -120,12 +120,12 @@ public class TwoPhaseCommitSinkFunctionTest {
assertEquals(expectedValues, actualValues);
}
- private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction> {
+ private static class FileBasedSinkFunction extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
private final File tmpDirectory;
private final File targetDirectory;
public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) {
- super(TypeInformation.of(new TypeHint<State<FileTransaction>>() {}));
+ super(TypeInformation.of(new TypeHint<State<FileTransaction, Void>>() {}));
if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
throw new IllegalArgumentException();
[4/4] flink git commit: [FLINK-7498][streaming] Bind together state
fields of TwoPhaseCommitSinkFunction
Posted by al...@apache.org.
[FLINK-7498][streaming] Bind together state fields of TwoPhaseCommitSinkFunction
Make sure that state fields are coupled together between checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac72360c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac72360c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac72360c
Branch: refs/heads/master
Commit: ac72360cc0e71d6f543d93c9c1f117babaa35799
Parents: 257a5a5
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Mon Aug 21 16:43:26 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Aug 29 14:45:34 2017 +0200
----------------------------------------------------------------------
.../sink/TwoPhaseCommitSinkFunction.java | 97 +++++++++++---------
.../sink/TwoPhaseCommitSinkFunctionTest.java | 4 +-
2 files changed, 57 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ac72360c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 58532f5..b73272d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -57,15 +57,14 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
private static final Logger LOG = LoggerFactory.getLogger(TwoPhaseCommitSinkFunction.class);
- protected final ListStateDescriptor<List<TXN>> pendingCommitTransactionsDescriptor;
- protected final ListStateDescriptor<TXN> pendingTransactionsDescriptor;
+ protected final ListStateDescriptor<State<TXN>> stateDescriptor;
protected final LinkedHashMap<Long, TXN> pendingCommitTransactions = new LinkedHashMap<>();
@Nullable
protected TXN currentTransaction;
- protected ListState<TXN> pendingTransactionsState;
- protected ListState<List<TXN>> pendingCommitTransactionsState;
+
+ protected ListState<State<TXN>> state;
/**
* Use default {@link ListStateDescriptor} for internal state serialization. Helpful utilities for using this
@@ -73,32 +72,22 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
* {@link TypeInformation#of(TypeHint)}. Example:
* <pre>
* {@code
- * TwoPhaseCommitSinkFunction(
- * TypeInformation.of(TXN.class),
- * TypeInformation.of(new TypeHint<List<TXN>>() {}));
+ * TwoPhaseCommitSinkFunction(TypeInformation.of(new TypeHint<State<TXN, CONTEXT>>() {}));
* }
* </pre>
- * @param txnTypeInformation {@link TypeInformation} for transaction POJO.
- * @param txnListTypeInformation {@link TypeInformation} for mapping between checkpointId and transaction.
+ * @param stateTypeInformation {@link TypeInformation} for POJO holding state of opened transactions.
*/
- public TwoPhaseCommitSinkFunction(
- TypeInformation<TXN> txnTypeInformation,
- TypeInformation<List<TXN>> txnListTypeInformation) {
- this(new ListStateDescriptor<>("pendingTransactions", txnTypeInformation),
- new ListStateDescriptor<>("pendingCommitTransactions", txnListTypeInformation));
+ public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN>> stateTypeInformation) {
+ this(new ListStateDescriptor<State<TXN>>("state", stateTypeInformation));
}
/**
* Instantiate {@link TwoPhaseCommitSinkFunction} with custom state descriptors.
*
- * @param pendingTransactionsDescriptor descriptor for transaction POJO.
- * @param pendingCommitTransactionsDescriptor descriptor for mapping between checkpointId and transaction POJO.
+ * @param stateDescriptor descriptor for transactions POJO.
*/
- public TwoPhaseCommitSinkFunction(
- ListStateDescriptor<TXN> pendingTransactionsDescriptor,
- ListStateDescriptor<List<TXN>> pendingCommitTransactionsDescriptor) {
- this.pendingTransactionsDescriptor = requireNonNull(pendingTransactionsDescriptor, "pendingTransactionsDescriptor is null");
- this.pendingCommitTransactionsDescriptor = requireNonNull(pendingCommitTransactionsDescriptor, "pendingCommitTransactionsDescriptor is null");
+ public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN>> stateDescriptor) {
+ this.stateDescriptor = requireNonNull(stateDescriptor, "stateDescriptor is null");
}
// ------ methods that should be implemented in child class to support two phase commit algorithm ------
@@ -195,11 +184,11 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
// ==> There should never be a case where we have no pending transaction here
//
- Iterator<Map.Entry<Long, TXN>> pendingTransactionsIterator = pendingCommitTransactions.entrySet().iterator();
- checkState(pendingTransactionsIterator.hasNext(), "checkpoint completed, but no transaction pending");
+ Iterator<Map.Entry<Long, TXN>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
+ checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
- while (pendingTransactionsIterator.hasNext()) {
- Map.Entry<Long, TXN> entry = pendingTransactionsIterator.next();
+ while (pendingTransactionIterator.hasNext()) {
+ Map.Entry<Long, TXN> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TXN pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
@@ -213,7 +202,7 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
- pendingTransactionsIterator.remove();
+ pendingTransactionIterator.remove();
}
}
@@ -234,13 +223,10 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
currentTransaction = beginTransaction();
LOG.debug("{} - started new transaction '{}'", name(), currentTransaction);
- pendingCommitTransactionsState.clear();
- pendingCommitTransactionsState.add(new ArrayList<>(pendingCommitTransactions.values()));
-
- pendingTransactionsState.clear();
- // in case of failure we might not be able to abort currentTransaction. Let's store it into the state
- // so it can be aborted after a restart/crash
- pendingTransactionsState.add(currentTransaction);
+ state.clear();
+ state.add(new State<>(
+ this.currentTransaction,
+ new ArrayList<>(pendingCommitTransactions.values())));
}
@Override
@@ -259,24 +245,21 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
// we can have more than one transaction to check in case of a scale-in event, or
// for the reasons discussed in the 'notifyCheckpointComplete()' method.
- pendingTransactionsState = context.getOperatorStateStore().getListState(pendingTransactionsDescriptor);
- pendingCommitTransactionsState = context.getOperatorStateStore().getListState(pendingCommitTransactionsDescriptor);
+ state = context.getOperatorStateStore().getListState(stateDescriptor);
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
- for (List<TXN> recoveredTransactions : pendingCommitTransactionsState.get()) {
+ for (State<TXN> operatorState : state.get()) {
+ List<TXN> recoveredTransactions = operatorState.getPendingCommitTransactions();
for (TXN recoveredTransaction : recoveredTransactions) {
// If this fails, there is actually a data loss
recoverAndCommit(recoveredTransaction);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}
- }
- // Explicitly abort transactions that could be not closed cleanly
- for (TXN pendingTransaction : pendingTransactionsState.get()) {
- recoverAndAbort(pendingTransaction);
- LOG.info("{} aborted recovered transaction {}", name(), pendingTransaction);
+ recoverAndAbort(operatorState.getPendingTransaction());
+ LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
}
} else {
LOG.info("{} - no state to restore {}", name());
@@ -304,4 +287,36 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN>
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
}
+
+ /**
+ * State POJO class coupling pendingTransaction, context and pendingCommitTransactions.
+ */
+ public static class State<TXN> {
+ protected TXN pendingTransaction;
+ protected List<TXN> pendingCommitTransactions = new ArrayList<>();
+
+ public State() {
+ }
+
+ public State(TXN pendingTransaction, List<TXN> pendingCommitTransactions) {
+ this.pendingTransaction = requireNonNull(pendingTransaction, "pendingTransaction is null");
+ this.pendingCommitTransactions = requireNonNull(pendingCommitTransactions, "pendingCommitTransactions is null");
+ }
+
+ public TXN getPendingTransaction() {
+ return pendingTransaction;
+ }
+
+ public void setPendingTransaction(TXN pendingTransaction) {
+ this.pendingTransaction = pendingTransaction;
+ }
+
+ public List<TXN> getPendingCommitTransactions() {
+ return pendingCommitTransactions;
+ }
+
+ public void setPendingCommitTransactions(List<TXN> pendingCommitTransactions) {
+ this.pendingCommitTransactions = pendingCommitTransactions;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac72360c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 9d01e74..f2fcb96 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -125,9 +125,7 @@ public class TwoPhaseCommitSinkFunctionTest {
private final File targetDirectory;
public FileBasedSinkFunction(File tmpDirectory, File targetDirectory) {
- super(
- TypeInformation.of(FileTransaction.class),
- TypeInformation.of(new TypeHint<List<FileTransaction>>() {}));
+ super(TypeInformation.of(new TypeHint<State<FileTransaction>>() {}));
if (!tmpDirectory.isDirectory() || !targetDirectory.isDirectory()) {
throw new IllegalArgumentException();