You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/11/02 10:00:08 UTC

[10/11] flink git commit: [hotfix][kafka] Do not return producers to a pool in abort for non EXACTLY_ONCE modes

[hotfix][kafka] Do not return producers to a pool in abort for non EXACTLY_ONCE modes

Previously on abort(...) producers were returned to the pool. This was minor bug,
probably without any negative side effect, however this patch fixes it
and adds additional sanity checks to guard against similar bugs
in the future.

This closes #4915.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51e9a015
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51e9a015
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51e9a015

Branch: refs/heads/master
Commit: 51e9a0159ba215e3779e4ef6e0ceb77d3df48f7d
Parents: 5058c3f
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Oct 27 15:47:26 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Nov 2 12:44:16 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 28 ++++++++++++--------
 1 file changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51e9a015/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index f349df3..c27c620 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -226,7 +225,7 @@ public class FlinkKafkaProducer011<IN>
 	/**
 	 * Pool of KafkaProducers objects.
 	 */
-	private transient ProducersPool producersPool = new ProducersPool();
+	private transient Optional<ProducersPool> producersPool = Optional.empty();
 
 	/**
 	 * Flag controlling whether we are writing the Flink record's timestamp into Kafka.
@@ -596,7 +595,7 @@ public class FlinkKafkaProducer011<IN>
 			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
 		}
 		try {
-			producersPool.close();
+			producersPool.ifPresent(pool -> pool.close());
 		}
 		catch (Exception e) {
 			asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
@@ -628,7 +627,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception {
-		FlinkKafkaProducer<byte[], byte[]> producer = producersPool.poll();
+		FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
 		if (producer == null) {
 			String transactionalId = availableTransactionalIds.poll();
 			if (transactionalId == null) {
@@ -661,7 +660,7 @@ public class FlinkKafkaProducer011<IN>
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				transaction.producer.commitTransaction();
-				producersPool.add(transaction.producer);
+				getProducersPool().add(transaction.producer);
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:
@@ -703,11 +702,10 @@ public class FlinkKafkaProducer011<IN>
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				transaction.producer.abortTransaction();
-				producersPool.add(transaction.producer);
+				getProducersPool().add(transaction.producer);
 				break;
 			case AT_LEAST_ONCE:
 			case NONE:
-				producersPool.add(transaction.producer);
 				break;
 			default:
 				throw new UnsupportedOperationException("Not implemented semantic");
@@ -760,7 +758,8 @@ public class FlinkKafkaProducer011<IN>
 		nextTransactionalIdHintState.clear();
 		// To avoid duplication only first subtask keeps track of next transactional id hint. Otherwise all of the
 		// subtasks would write exactly same information.
-		if (getRuntimeContext().getIndexOfThisSubtask() == 0 && nextTransactionalIdHint != null) {
+		if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
+			checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
 			long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
 
 			// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
@@ -788,7 +787,10 @@ public class FlinkKafkaProducer011<IN>
 
 		if (semantic != Semantic.EXACTLY_ONCE) {
 			nextTransactionalIdHint = null;
+			producersPool = Optional.empty();
 		} else {
+			producersPool = Optional.of(new ProducersPool());
+
 			ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
 			if (transactionalIdHints.size() > 1) {
 				throw new IllegalStateException(
@@ -829,8 +831,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	private Set<String> generateNewTransactionalIds() {
-		Preconditions.checkState(nextTransactionalIdHint != null,
-			"nextTransactionalIdHint must be present for EXACTLY_ONCE");
+		checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be present for EXACTLY_ONCE");
 
 		// range of available transactional ids is:
 		// [nextFreeTransactionalId, nextFreeTransactionalId + parallelism * kafkaProducersPoolSize)
@@ -903,6 +904,11 @@ public class FlinkKafkaProducer011<IN>
 		return initProducer(registerMetrics);
 	}
 
+	private ProducersPool getProducersPool() {
+		checkState(producersPool.isPresent(), "Trying to access uninitialized producer pool");
+		return producersPool.get();
+	}
+
 	private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
 		FlinkKafkaProducer<byte[], byte[]> producer = new FlinkKafkaProducer<>(this.producerConfig);
 
@@ -958,7 +964,7 @@ public class FlinkKafkaProducer011<IN>
 
 	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
-		producersPool = new ProducersPool();
+		producersPool = Optional.empty();
 	}
 
 	private static Properties getPropertiesFromBrokerList(String brokerList) {