You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/10/31 09:58:51 UTC

[flink] branch master updated: [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d90e499  [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
d90e499 is described below

commit d90e49905ed6f2f9a519ac274345f08c867a999d
Author: Tony Wei <to...@gmail.com>
AuthorDate: Thu Oct 31 17:58:43 2019 +0800

    [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
---
 .../kafka/internal/FlinkKafkaProducer.java         | 14 ++++--
 .../connectors/kafka/FlinkKafkaProducerITCase.java | 56 ++++++++++++++++++++++
 .../kafka/internal/FlinkKafkaInternalProducer.java | 14 ++++--
 .../kafka/FlinkKafkaInternalProducerITCase.java    | 56 ++++++++++++++++++++++
 4 files changed, 134 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 9f00606..1e73139 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -306,9 +306,17 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
 	private TransactionalRequestResult enqueueNewPartitions() {
 		Object transactionManager = getValue(kafkaProducer, "transactionManager");
 		synchronized (transactionManager) {
-			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
-			invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
-			TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+			Object newPartitionsInTransaction = getValue(transactionManager, "newPartitionsInTransaction");
+			Object newPartitionsInTransactionIsEmpty = invoke(newPartitionsInTransaction, "isEmpty");
+			TransactionalRequestResult result;
+			if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+				Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+				invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+				result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+			} else {
+				result = new TransactionalRequestResult();
+				result.done();
+			}
 			return result;
 		}
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 6e13cc6a..b6de3c7 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -22,12 +22,14 @@ import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 	protected String transactionalId;
 	protected Properties extraProperties;
 
+	@BeforeClass
+	public static void prepare() throws Exception {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting KafkaTestBase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		Properties serverProperties = new Properties();
+		serverProperties.put("transaction.state.log.num.partitions", Integer.toString(1));
+		serverProperties.put("auto.leader.rebalance.enable", Boolean.toString(false));
+		startClusters(KafkaTestEnvironment.createConfig()
+			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+			.setSecureMode(false)
+			.setHideKafkaBehindProxy(true)
+			.setKafkaServerProperties(serverProperties));
+	}
+
 	@Before
 	public void before() {
 		transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 		kafkaProducer.flush();
 	}
 
+	@Test(timeout = 30000L)
+	public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
+		String topic = "flink-kafka-producer-txn-coordinator-changed";
+		createTestTopic(topic, 1, 2);
+		try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+			kafkaProducer.flush();
+			kafkaProducer.commitTransaction();
+		}
+		deleteTestTopic(topic);
+	}
+
 	private FlinkKafkaProducer<String, String> getClosedProducer(String topicName) {
 		FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
 		kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 			assertEquals(expectedValue, record.value());
 		}
 	}
+
+	private void restartBroker(int brokerId) {
+		KafkaServer toRestart = null;
+		for (KafkaServer server : kafkaServer.getBrokers()) {
+			if (kafkaServer.getBrokerId(server) == brokerId) {
+				toRestart = server;
+			}
+		}
+
+		if (toRestart == null) {
+			StringBuilder listOfBrokers = new StringBuilder();
+			for (KafkaServer server : kafkaServer.getBrokers()) {
+				listOfBrokers.append(kafkaServer.getBrokerId(server));
+				listOfBrokers.append(" ; ");
+			}
+
+			throw new IllegalArgumentException("Cannot find broker to restart: " + brokerId
+				+ " ; available brokers: " + listOfBrokers.toString());
+		} else {
+			toRestart.shutdown();
+			toRestart.awaitShutdown();
+			toRestart.startup();
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 78bbb53..8c2246e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -269,9 +269,17 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
 	private TransactionalRequestResult enqueueNewPartitions() {
 		Object transactionManager = getValue(kafkaProducer, "transactionManager");
 		synchronized (transactionManager) {
-			Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
-			invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
-			TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+			Object newPartitionsInTransaction = getValue(transactionManager, "newPartitionsInTransaction");
+			Object newPartitionsInTransactionIsEmpty = invoke(newPartitionsInTransaction, "isEmpty");
+			TransactionalRequestResult result;
+			if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+				Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+				invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+				result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+			} else {
+				result = new TransactionalRequestResult();
+				result.done();
+			}
 			return result;
 		}
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 2d749ba..5062b70 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -22,12 +22,14 @@ import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalPr
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
+import kafka.server.KafkaServer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 	protected String transactionalId;
 	protected Properties extraProperties;
 
+	@BeforeClass
+	public static void prepare() throws Exception {
+		LOG.info("-------------------------------------------------------------------------");
+		LOG.info("    Starting KafkaTestBase ");
+		LOG.info("-------------------------------------------------------------------------");
+
+		Properties serverProperties = new Properties();
+		serverProperties.put("transaction.state.log.num.partitions", Integer.toString(1));
+		serverProperties.put("auto.leader.rebalance.enable", Boolean.toString(false));
+		startClusters(KafkaTestEnvironment.createConfig()
+			.setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+			.setSecureMode(false)
+			.setHideKafkaBehindProxy(true)
+			.setKafkaServerProperties(serverProperties));
+	}
+
 	@Before
 	public void before() {
 		transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 		kafkaProducer.flush();
 	}
 
+	@Test(timeout = 30000L)
+	public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
+		String topic = "flink-kafka-producer-txn-coordinator-changed";
+		createTestTopic(topic, 1, 2);
+		try (Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+			kafkaProducer.initTransactions();
+			kafkaProducer.beginTransaction();
+			restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+			kafkaProducer.flush();
+			kafkaProducer.commitTransaction();
+		}
+		deleteTestTopic(topic);
+	}
+
 	private FlinkKafkaInternalProducer<String, String> getClosedProducer(String topicName) {
 		FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
 		kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 			assertEquals(expectedValue, record.value());
 		}
 	}
+
+	private void restartBroker(int brokerId) {
+		KafkaServer toRestart = null;
+		for (KafkaServer server : kafkaServer.getBrokers()) {
+			if (kafkaServer.getBrokerId(server) == brokerId) {
+				toRestart = server;
+			}
+		}
+
+		if (toRestart == null) {
+			StringBuilder listOfBrokers = new StringBuilder();
+			for (KafkaServer server : kafkaServer.getBrokers()) {
+				listOfBrokers.append(kafkaServer.getBrokerId(server));
+				listOfBrokers.append(" ; ");
+			}
+
+			throw new IllegalArgumentException("Cannot find broker to restart: " + brokerId
+				+ " ; available brokers: " + listOfBrokers.toString());
+		} else {
+			toRestart.shutdown();
+			toRestart.awaitShutdown();
+			toRestart.startup();
+		}
+	}
 }