You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:03 UTC

[05/11] flink git commit: [FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer

[FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c123e40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c123e40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c123e40

Branch: refs/heads/master
Commit: 1c123e40e564b097fde22da648679963c40bdfe3
Parents: dc2ef4f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Oct 24 17:57:05 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:43:20 2017 +0800

----------------------------------------------------------------------
 .../kafka/internal/FlinkKafkaProducer.java      | 46 ++++++++++++++------
 1 file changed, 33 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1c123e40/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 56b40d7..9d50379 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -181,24 +181,31 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 		}
 	}
 
+	/**
+	 * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
+	 * so that we can resume transaction after a restart. Implementation of this method is based on
+	 * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
+	 */
 	public void resumeTransaction(long producerId, short epoch) {
 		Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch);
 		LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch);
 
 		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
+		synchronized (transactionManager) {
+			Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
 
-		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-		invoke(sequenceNumbers, "clear");
+			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+			invoke(sequenceNumbers, "clear");
 
-		Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
-		setValue(producerIdAndEpoch, "producerId", producerId);
-		setValue(producerIdAndEpoch, "epoch", epoch);
+			Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+			setValue(producerIdAndEpoch, "producerId", producerId);
+			setValue(producerIdAndEpoch, "epoch", epoch);
 
-		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
 
-		invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
-		setValue(transactionManager, "transactionStarted", true);
+			invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+			setValue(transactionManager, "transactionStarted", true);
+		}
 	}
 
 	public String getTransactionalId() {
@@ -224,17 +231,30 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 		return node.id();
 	}
 
+	/**
+	 * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
+	 * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
+	 * resumeTransaction simpler. Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight"
+	 * partitions.
+	 */
 	private void flushNewPartitions() {
 		LOG.info("Flushing new partitions");
-		Object transactionManager = getValue(kafkaProducer, "transactionManager");
-		Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
-		invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
-		TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+		TransactionalRequestResult result = enqueueNewPartitions();
 		Object sender = getValue(kafkaProducer, "sender");
 		invoke(sender, "wakeup");
 		result.await();
 	}
 
+	private TransactionalRequestResult enqueueNewPartitions() {
+		Object transactionManager = getValue(kafkaProducer, "transactionManager");
+		synchronized (transactionManager) {
+			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+			invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+			TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+			return result;
+		}
+	}
+
 	private static Enum<?> getEnum(String enumFullName) {
 		String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
 		if (x.length == 2) {