You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:03 UTC
[05/11] flink git commit: [FLINK-7838][kafka] Add missing
synchronization in FlinkKafkaProducer
[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c123e40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c123e40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c123e40
Branch: refs/heads/master
Commit: 1c123e40e564b097fde22da648679963c40bdfe3
Parents: dc2ef4f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Oct 24 17:57:05 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:43:20 2017 +0800
----------------------------------------------------------------------
.../kafka/internal/FlinkKafkaProducer.java | 46 ++++++++++++++------
1 file changed, 33 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1c123e40/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 56b40d7..9d50379 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -181,24 +181,31 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
}
}
+ /**
+ * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
+ * so that we can resume transaction after a restart. Implementation of this method is based on
+ * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
+ */
public void resumeTransaction(long producerId, short epoch) {
Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
Object transactionManager = getValue(kafkaProducer, "transactionManager");
- Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+ synchronized (transactionManager) {
+ Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
- invoke(sequenceNumbers, "clear");
+ invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+ invoke(sequenceNumbers, "clear");
- Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
- setValue(producerIdAndEpoch, "producerId", producerId);
- setValue(producerIdAndEpoch, "epoch", epoch);
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ setValue(producerIdAndEpoch, "producerId", producerId);
+ setValue(producerIdAndEpoch, "epoch", epoch);
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+ invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
- invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
- setValue(transactionManager, "transactionStarted", true);
+ invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+ setValue(transactionManager, "transactionStarted", true);
+ }
}
public String getTransactionalId() {
@@ -224,17 +231,30 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
return node.id();
}
+ /**
+ * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
+ * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
+ * resumeTransaction simpler. Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight"
+ * partitions.
+ */
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
- Object transactionManager = getValue(kafkaProducer, "transactionManager");
- Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
- invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
- TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+ TransactionalRequestResult result = enqueueNewPartitions();
Object sender = getValue(kafkaProducer, "sender");
invoke(sender, "wakeup");
result.await();
}
+ private TransactionalRequestResult enqueueNewPartitions() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ synchronized (transactionManager) {
+ Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+ TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+ return result;
+ }
+ }
+
private static Enum<?> getEnum(String enumFullName) {
String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
if (x.length == 2) {