You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2019/10/31 09:58:51 UTC
[flink] branch master updated: [FLINK-14302] [kafka]
FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request
if `newPartitionsInTransaction` is empty when enable EoS
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d90e499 [FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
d90e499 is described below
commit d90e49905ed6f2f9a519ac274345f08c867a999d
Author: Tony Wei <to...@gmail.com>
AuthorDate: Thu Oct 31 17:58:43 2019 +0800
[FLINK-14302] [kafka] FlinkKafkaInternalProducer should not send `ADD_PARTITIONS_TO_TXN` request if `newPartitionsInTransaction` is empty when enable EoS
---
.../kafka/internal/FlinkKafkaProducer.java | 14 ++++--
.../connectors/kafka/FlinkKafkaProducerITCase.java | 56 ++++++++++++++++++++++
.../kafka/internal/FlinkKafkaInternalProducer.java | 14 ++++--
.../kafka/FlinkKafkaInternalProducerITCase.java | 56 ++++++++++++++++++++++
4 files changed, 134 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 9f00606..1e73139 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -306,9 +306,17 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> {
private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
synchronized (transactionManager) {
- Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
- invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
- TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+ Object newPartitionsInTransaction = getValue(transactionManager, "newPartitionsInTransaction");
+ Object newPartitionsInTransactionIsEmpty = invoke(newPartitionsInTransaction, "isEmpty");
+ TransactionalRequestResult result;
+ if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+ Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+ result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+ } else {
+ result = new TransactionalRequestResult();
+ result.done();
+ }
return result;
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 6e13cc6a..b6de3c7 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -22,12 +22,14 @@ import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
protected String transactionalId;
protected Properties extraProperties;
+ @BeforeClass
+ public static void prepare() throws Exception {
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaTestBase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ Properties serverProperties = new Properties();
+ serverProperties.put("transaction.state.log.num.partitions", Integer.toString(1));
+ serverProperties.put("auto.leader.rebalance.enable", Boolean.toString(false));
+ startClusters(KafkaTestEnvironment.createConfig()
+ .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+ .setSecureMode(false)
+ .setHideKafkaBehindProxy(true)
+ .setKafkaServerProperties(serverProperties));
+ }
+
@Before
public void before() {
transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
kafkaProducer.flush();
}
+ @Test(timeout = 30000L)
+ public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
+ String topic = "flink-kafka-producer-txn-coordinator-changed";
+ createTestTopic(topic, 1, 2);
+ try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+ kafkaProducer.flush();
+ kafkaProducer.commitTransaction();
+ }
+ deleteTestTopic(topic);
+ }
+
private FlinkKafkaProducer<String, String> getClosedProducer(String topicName) {
FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties);
kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
assertEquals(expectedValue, record.value());
}
}
+
+ private void restartBroker(int brokerId) {
+ KafkaServer toRestart = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ if (kafkaServer.getBrokerId(server) == brokerId) {
+ toRestart = server;
+ }
+ }
+
+ if (toRestart == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new IllegalArgumentException("Cannot find broker to restart: " + brokerId
+ + " ; available brokers: " + listOfBrokers.toString());
+ } else {
+ toRestart.shutdown();
+ toRestart.awaitShutdown();
+ toRestart.startup();
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
index 78bbb53..8c2246e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java
@@ -269,9 +269,17 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> {
private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getValue(kafkaProducer, "transactionManager");
synchronized (transactionManager) {
- Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
- invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
- TransactionalRequestResult result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+ Object newPartitionsInTransaction = getValue(transactionManager, "newPartitionsInTransaction");
+ Object newPartitionsInTransactionIsEmpty = invoke(newPartitionsInTransaction, "isEmpty");
+ TransactionalRequestResult result;
+ if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
+ Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
+ result = (TransactionalRequestResult) getValue(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
+ } else {
+ result = new TransactionalRequestResult();
+ result.done();
+ }
return result;
}
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 2d749ba..5062b70 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -22,12 +22,14 @@ import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalPr
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
@@ -45,6 +47,22 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
protected String transactionalId;
protected Properties extraProperties;
+ @BeforeClass
+ public static void prepare() throws Exception {
+ LOG.info("-------------------------------------------------------------------------");
+ LOG.info(" Starting KafkaTestBase ");
+ LOG.info("-------------------------------------------------------------------------");
+
+ Properties serverProperties = new Properties();
+ serverProperties.put("transaction.state.log.num.partitions", Integer.toString(1));
+ serverProperties.put("auto.leader.rebalance.enable", Boolean.toString(false));
+ startClusters(KafkaTestEnvironment.createConfig()
+ .setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS)
+ .setSecureMode(false)
+ .setHideKafkaBehindProxy(true)
+ .setKafkaServerProperties(serverProperties));
+ }
+
@Before
public void before() {
transactionalId = UUID.randomUUID().toString();
@@ -152,6 +170,20 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
kafkaProducer.flush();
}
+ @Test(timeout = 30000L)
+ public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
+ String topic = "flink-kafka-producer-txn-coordinator-changed";
+ createTestTopic(topic, 1, 2);
+ try (Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties)) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+ restartBroker(kafkaServer.getLeaderToShutDown("__transaction_state"));
+ kafkaProducer.flush();
+ kafkaProducer.commitTransaction();
+ }
+ deleteTestTopic(topic);
+ }
+
private FlinkKafkaInternalProducer<String, String> getClosedProducer(String topicName) {
FlinkKafkaInternalProducer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
@@ -171,4 +203,28 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
assertEquals(expectedValue, record.value());
}
}
+
+ private void restartBroker(int brokerId) {
+ KafkaServer toRestart = null;
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ if (kafkaServer.getBrokerId(server) == brokerId) {
+ toRestart = server;
+ }
+ }
+
+ if (toRestart == null) {
+ StringBuilder listOfBrokers = new StringBuilder();
+ for (KafkaServer server : kafkaServer.getBrokers()) {
+ listOfBrokers.append(kafkaServer.getBrokerId(server));
+ listOfBrokers.append(" ; ");
+ }
+
+ throw new IllegalArgumentException("Cannot find broker to restart: " + brokerId
+ + " ; available brokers: " + listOfBrokers.toString());
+ } else {
+ toRestart.shutdown();
+ toRestart.awaitShutdown();
+ toRestart.startup();
+ }
+ }
}