You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/12 20:32:29 UTC
[kafka] branch 0.11.0 updated: KAFKA-6782: solved the bug of
restoration of aborted messages for GlobalStateStore and KGlobalTable
(#4900)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 9bf277b KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)
9bf277b is described below
commit 9bf277bc1a830b1f1110dc02e94f720bf7ed293a
Author: Gitomain <li...@gmail.com>
AuthorDate: Tue Jun 12 20:54:07 2018 +0200
KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)
Reviewer: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.gitignore | 1 +
kafka | 1 +
.../internals/GlobalStateManagerImpl.java | 2 +-
...st.java => GlobalKTableEOSIntegrationTest.java} | 121 ++++++++++++++-------
.../integration/GlobalKTableIntegrationTest.java | 60 ++--------
.../integration/utils/IntegrationTestUtils.java | 35 ++++--
.../processor/internals/StreamTaskTest.java | 10 ++
7 files changed, 135 insertions(+), 95 deletions(-)
diff --git a/.gitignore b/.gitignore
index e8503ff..59adf08 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
dist
*classes
+*.class
target/
build/
build_eclipse/
diff --git a/kafka b/kafka
new file mode 160000
index 0000000..cc43e77
--- /dev/null
+++ b/kafka
@@ -0,0 +1 @@
+Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index b7ff7ed..1e2e5ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -176,8 +176,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
if (record.key() != null) {
stateRestoreCallback.restore(record.key(), record.value());
}
- offset = consumer.position(topicPartition);
}
+ offset = consumer.position(topicPartition);
}
checkpointableOffsets.put(topicPartition, offset);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
similarity index 76%
copy from streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
copy to streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 7ee2d4b..e5a4c6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -54,7 +53,7 @@ import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
-public class GlobalKTableIntegrationTest {
+public class GlobalKTableEOSIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
static {
@@ -81,17 +80,15 @@ public class GlobalKTableIntegrationTest {
return value1 + "+" + value2;
}
};
+ private final String globalStore = "globalStore";
+ private final Map<String, String> results = new HashMap<>();
private KStreamBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
- private String globalOne;
- private String inputStream;
- private String inputTable;
- private final String globalStore = "globalStore";
+ private String globalTableTopic;
+ private String streamTopic;
private GlobalKTable<Long, String> globalTable;
private KStream<String, Long> stream;
- private KTable<String, Long> table;
- final Map<String, String> results = new HashMap<>();
private ForeachAction<String, String> foreachAction;
@Before
@@ -100,18 +97,17 @@ public class GlobalKTableIntegrationTest {
builder = new KStreamBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "globalOne-table-test-" + testNo;
+ final String applicationId = "globalTableTopic-table-eos-test-" + testNo;
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
- stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
- table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
+ streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+ globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalTableTopic, globalStore);
+ stream = builder.stream(Serdes.String(), Serdes.Long(), streamTopic);
foreachAction = new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
@@ -134,7 +130,7 @@ public class GlobalKTableIntegrationTest {
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
startStreams();
- produceTopicValues(inputStream);
+ produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
expected.put("a", "1+A");
@@ -161,7 +157,7 @@ public class GlobalKTableIntegrationTest {
return "J".equals(replicatedStore.get(5L));
}
}, 30000, "waiting for data in replicated store");
- produceTopicValues(inputStream);
+ produceTopicValues(streamTopic);
expected.put("a", "1+F");
expected.put("b", "2+G");
@@ -183,7 +179,7 @@ public class GlobalKTableIntegrationTest {
streamTableJoin.foreach(foreachAction);
produceInitialGlobalTableValues();
startStreams();
- produceTopicValues(inputStream);
+ produceTopicValues(streamTopic);
final Map<String, String> expected = new HashMap<>();
expected.put("a", "1+A");
@@ -210,7 +206,7 @@ public class GlobalKTableIntegrationTest {
}
}, 30000, "waiting for data in replicated store");
- produceTopicValues(inputStream);
+ produceTopicValues(streamTopic);
expected.put("a", "1+F");
expected.put("b", "2+G");
@@ -228,7 +224,8 @@ public class GlobalKTableIntegrationTest {
@Test
public void shouldRestoreTransactionalMessages() throws Exception {
- produceInitialGlobalTableValues(true);
+ produceInitialGlobalTableValues();
+
startStreams();
final Map<Long, String> expected = new HashMap<>();
@@ -255,23 +252,55 @@ public class GlobalKTableIntegrationTest {
return result.equals(expected);
}
}, 30000L, "waiting for initial values");
- System.out.println("no failed test");
}
+
+ @Test
+ public void shouldNotRestoreAbortedMessages() throws Exception {
+ produceAbortedMessages();
+ produceInitialGlobalTableValues();
+ produceAbortedMessages();
- private void createTopics() throws InterruptedException {
- inputStream = "input-stream-" + testNo;
- inputTable = "input-table-" + testNo;
- globalOne = "globalOne-" + testNo;
- CLUSTER.createTopics(inputStream, inputTable);
- CLUSTER.createTopic(globalOne, 2, 1);
+ startStreams();
+
+ final Map<Long, String> expected = new HashMap<>();
+ expected.put(1L, "A");
+ expected.put(2L, "B");
+ expected.put(3L, "C");
+ expected.put(4L, "D");
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ ReadOnlyKeyValueStore<Long, String> store = null;
+ try {
+ store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+ } catch (InvalidStateStoreException ex) {
+ return false;
+ }
+ Map<Long, String> result = new HashMap<>();
+ Iterator<KeyValue<Long, String>> it = store.all();
+ while (it.hasNext()) {
+ KeyValue<Long, String> kv = it.next();
+ result.put(kv.key, kv.value);
+ }
+ return result.equals(expected);
+ }
+ }, 30000L, "waiting for initial values");
}
+ private void createTopics() throws InterruptedException {
+ streamTopic = "stream-" + testNo;
+ globalTableTopic = "globalTable-" + testNo;
+ CLUSTER.createTopics(streamTopic);
+ CLUSTER.createTopic(globalTableTopic, 2, 1);
+ }
+
private void startStreams() {
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
}
- private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceTopicValues(final String topic) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
Arrays.asList(
@@ -288,23 +317,43 @@ public class GlobalKTableIntegrationTest {
mockTime);
}
- private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
- produceInitialGlobalTableValues(false);
+ private void produceAbortedMessages() throws Exception {
+ final Properties properties = new Properties();
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+ properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+ IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(
+ globalTableTopic, Arrays.asList(
+ new KeyValue<>(1L, "A"),
+ new KeyValue<>(2L, "B"),
+ new KeyValue<>(3L, "C"),
+ new KeyValue<>(4L, "D")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ LongSerializer.class,
+ StringSerializer.class,
+ properties),
+ mockTime.milliseconds());
}
- private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
- Properties properties = new Properties();
+ private void produceInitialGlobalTableValues() throws Exception {
+ produceInitialGlobalTableValues(true);
+ }
+
+ private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
+ final Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
}
IntegrationTestUtils.produceKeyValuesSynchronously(
- globalOne,
+ globalTableTopic,
Arrays.asList(
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
- new KeyValue<>(4L, "D")),
+ new KeyValue<>(4L, "D")
+ ),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
@@ -314,9 +363,9 @@ public class GlobalKTableIntegrationTest {
enableTransactions);
}
- private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceGlobalTableValues() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
- globalOne,
+ globalTableTopic,
Arrays.asList(
new KeyValue<>(1L, "F"),
new KeyValue<>(2L, "G"),
@@ -330,6 +379,4 @@ public class GlobalKTableIntegrationTest {
new Properties()),
mockTime);
}
-
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 7ee2d4b..9a849f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -49,23 +48,16 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
- private static final Properties BROKER_CONFIG;
- static {
- BROKER_CONFIG = new Properties();
- BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
- BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
- }
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+ new EmbeddedKafkaCluster(NUM_BROKERS);
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
@@ -225,39 +217,7 @@ public class GlobalKTableIntegrationTest {
}
}, 30000L, "waiting for final values");
}
-
- @Test
- public void shouldRestoreTransactionalMessages() throws Exception {
- produceInitialGlobalTableValues(true);
- startStreams();
-
- final Map<Long, String> expected = new HashMap<>();
- expected.put(1L, "A");
- expected.put(2L, "B");
- expected.put(3L, "C");
- expected.put(4L, "D");
-
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- ReadOnlyKeyValueStore<Long, String> store = null;
- try {
- store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
- } catch (InvalidStateStoreException ex) {
- return false;
- }
- Map<Long, String> result = new HashMap<>();
- Iterator<KeyValue<Long, String>> it = store.all();
- while (it.hasNext()) {
- KeyValue<Long, String> kv = it.next();
- result.put(kv.key, kv.value);
- }
- return result.equals(expected);
- }
- }, 30000L, "waiting for initial values");
- System.out.println("no failed test");
- }
-
+
private void createTopics() throws InterruptedException {
inputStream = "input-stream-" + testNo;
inputTable = "input-table-" + testNo;
@@ -265,7 +225,7 @@ public class GlobalKTableIntegrationTest {
CLUSTER.createTopics(inputStream, inputTable);
CLUSTER.createTopic(globalOne, 2, 1);
}
-
+
private void startStreams() {
kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
kafkaStreams.start();
@@ -288,11 +248,11 @@ public class GlobalKTableIntegrationTest {
mockTime);
}
- private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceInitialGlobalTableValues() throws Exception {
produceInitialGlobalTableValues(false);
}
- private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
@@ -304,14 +264,14 @@ public class GlobalKTableIntegrationTest {
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
- new KeyValue<>(4L, "D")),
+ new KeyValue<>(4L, "D")
+ ),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
- StringSerializer.class,
- properties),
- mockTime,
- enableTransactions);
+ StringSerializer.class
+ ),
+ mockTime);
}
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index b911ebf..bb20686 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -140,16 +140,38 @@ public class IntegrationTestUtils {
producer.flush();
}
}
+
+ public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
+ final Collection<KeyValue<K, V>> records,
+ final Properties producerConfig,
+ final Long timestamp)
+ throws ExecutionException, InterruptedException {
+ try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+ producer.initTransactions();
+ for (final KeyValue<K, V> record : records) {
+ producer.beginTransaction();
+ final Future<RecordMetadata> f = producer
+ .send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+ f.get();
+ producer.abortTransaction();
+ }
+ }
+ }
- public static <V> void produceValuesSynchronously(
- final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
+ public static <V> void produceValuesSynchronously(final String topic,
+ final Collection<V> records,
+ final Properties producerConfig,
+ final Time time)
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
}
- public static <V> void produceValuesSynchronously(
- final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
- throws ExecutionException, InterruptedException {
+ public static <V> void produceValuesSynchronously(final String topic,
+ final Collection<V> records,
+ final Properties producerConfig,
+ final Time time,
+ final boolean enableTransactions)
+ throws ExecutionException, InterruptedException {
final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
for (final V value : records) {
final KeyValue<Object, V> kv = new KeyValue<>(null, value);
@@ -161,10 +183,9 @@ public class IntegrationTestUtils {
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
-
return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
}
-
+
/**
* Wait until enough data (key-value records) has been consumed.
*
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0ba3f1d..4bfa0a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -636,6 +636,16 @@ public class StreamTaskTest {
}
@Test
+ public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.close(false, false);
+ task = null;
+ }
+
+ @Test
public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.