You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/12 19:14:18 UTC
[kafka] branch 1.1 updated: KAFKA-6782: solved the bug of
restoration of aborted messages for GlobalStateStore and KGlobalTable
(#4900)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 8bf8dda KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)
8bf8dda is described below
commit 8bf8ddac41dfea1f666bb50793efd8a3d7b4d5b5
Author: Gitomain <li...@gmail.com>
AuthorDate: Tue Jun 12 20:54:07 2018 +0200
KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)
Reviewer: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.gitignore | 1 +
kafka | 1 +
.../internals/GlobalStateManagerImpl.java | 2 +-
...st.java => GlobalKTableEOSIntegrationTest.java} | 74 +++++++++++++++++++---
.../integration/GlobalKTableIntegrationTest.java | 66 +++----------------
.../integration/utils/IntegrationTestUtils.java | 35 ++++++++--
6 files changed, 103 insertions(+), 76 deletions(-)
diff --git a/.gitignore b/.gitignore
index 6088349..964c8f6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
dist
*classes
+*.class
target/
build/
build_eclipse/
diff --git a/kafka b/kafka
new file mode 160000
index 0000000..cc43e77
--- /dev/null
+++ b/kafka
@@ -0,0 +1 @@
+Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 56e6bed..64d0c1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -268,8 +268,8 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(), record.value()));
}
- offset = globalConsumer.position(topicPartition);
}
+ offset = globalConsumer.position(topicPartition);
stateRestoreAdapter.restoreAll(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
restoreCount += restoreRecords.size();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
similarity index 83%
copy from streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
copy to streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index c934761..9c202f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -57,7 +57,7 @@ import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
-public class GlobalKTableIntegrationTest {
+public class GlobalKTableEOSIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
static {
@@ -101,15 +101,15 @@ public class GlobalKTableIntegrationTest {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
- final String applicationId = "globalTableTopic-table-test-" + testNo;
+ final String applicationId = "globalTableTopic-table-eos-test-" + testNo;
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
.withKeySerde(Serdes.Long())
@@ -232,7 +232,8 @@ public class GlobalKTableIntegrationTest {
@Test
public void shouldRestoreTransactionalMessages() throws Exception {
- produceInitialGlobalTableValues(true);
+ produceInitialGlobalTableValues();
+
startStreams();
final Map<Long, String> expected = new HashMap<>();
@@ -259,7 +260,40 @@ public class GlobalKTableIntegrationTest {
return result.equals(expected);
}
}, 30000L, "waiting for initial values");
- System.out.println("no failed test");
+ }
+
+ @Test
+ public void shouldNotRestoreAbortedMessages() throws Exception {
+ produceAbortedMessages();
+ produceInitialGlobalTableValues();
+ produceAbortedMessages();
+
+ startStreams();
+
+ final Map<Long, String> expected = new HashMap<>();
+ expected.put(1L, "A");
+ expected.put(2L, "B");
+ expected.put(3L, "C");
+ expected.put(4L, "D");
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ ReadOnlyKeyValueStore<Long, String> store = null;
+ try {
+ store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+ } catch (InvalidStateStoreException ex) {
+ return false;
+ }
+ Map<Long, String> result = new HashMap<>();
+ Iterator<KeyValue<Long, String>> it = store.all();
+ while (it.hasNext()) {
+ KeyValue<Long, String> kv = it.next();
+ result.put(kv.key, kv.value);
+ }
+ return result.equals(expected);
+ }
+ }, 30000L, "waiting for initial values");
}
private void createTopics() throws InterruptedException {
@@ -268,7 +302,7 @@ public class GlobalKTableIntegrationTest {
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
-
+
private void startStreams() {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
@@ -291,12 +325,31 @@ public class GlobalKTableIntegrationTest {
mockTime);
}
+ private void produceAbortedMessages() throws Exception {
+ final Properties properties = new Properties();
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+ properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+ IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(
+ globalTableTopic, Arrays.asList(
+ new KeyValue<>(1L, "A"),
+ new KeyValue<>(2L, "B"),
+ new KeyValue<>(3L, "C"),
+ new KeyValue<>(4L, "D")
+ ),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ LongSerializer.class,
+ StringSerializer.class,
+ properties),
+ mockTime.milliseconds());
+ }
+
private void produceInitialGlobalTableValues() throws Exception {
- produceInitialGlobalTableValues(false);
+ produceInitialGlobalTableValues(true);
}
private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
- Properties properties = new Properties();
+ final Properties properties = new Properties();
if (enableTransactions) {
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
@@ -307,7 +360,8 @@ public class GlobalKTableIntegrationTest {
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
- new KeyValue<>(4L, "D")),
+ new KeyValue<>(4L, "D")
+ ),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index c934761..5723f14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,7 +27,6 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -52,23 +50,16 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
- private static final Properties BROKER_CONFIG;
- static {
- BROKER_CONFIG = new Properties();
- BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
- BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
- }
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+ new EmbeddedKafkaCluster(NUM_BROKERS);
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
@@ -229,46 +220,14 @@ public class GlobalKTableIntegrationTest {
}
}, 30000L, "waiting for final values");
}
-
- @Test
- public void shouldRestoreTransactionalMessages() throws Exception {
- produceInitialGlobalTableValues(true);
- startStreams();
-
- final Map<Long, String> expected = new HashMap<>();
- expected.put(1L, "A");
- expected.put(2L, "B");
- expected.put(3L, "C");
- expected.put(4L, "D");
-
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- ReadOnlyKeyValueStore<Long, String> store = null;
- try {
- store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
- } catch (InvalidStateStoreException ex) {
- return false;
- }
- Map<Long, String> result = new HashMap<>();
- Iterator<KeyValue<Long, String>> it = store.all();
- while (it.hasNext()) {
- KeyValue<Long, String> kv = it.next();
- result.put(kv.key, kv.value);
- }
- return result.equals(expected);
- }
- }, 30000L, "waiting for initial values");
- System.out.println("no failed test");
- }
-
+
private void createTopics() throws InterruptedException {
streamTopic = "stream-" + testNo;
globalTableTopic = "globalTable-" + testNo;
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
-
+
private void startStreams() {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
@@ -292,29 +251,20 @@ public class GlobalKTableIntegrationTest {
}
private void produceInitialGlobalTableValues() throws Exception {
- produceInitialGlobalTableValues(false);
- }
-
- private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
- Properties properties = new Properties();
- if (enableTransactions) {
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
- properties.put(ProducerConfig.RETRIES_CONFIG, 1);
- }
IntegrationTestUtils.produceKeyValuesSynchronously(
globalTableTopic,
Arrays.asList(
new KeyValue<>(1L, "A"),
new KeyValue<>(2L, "B"),
new KeyValue<>(3L, "C"),
- new KeyValue<>(4L, "D")),
+ new KeyValue<>(4L, "D")
+ ),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
LongSerializer.class,
- StringSerializer.class,
- properties),
- mockTime,
- enableTransactions);
+ StringSerializer.class
+ ),
+ mockTime);
}
private void produceGlobalTableValues() throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e8cd59e..304a3e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -140,16 +140,38 @@ public class IntegrationTestUtils {
producer.flush();
}
}
+
+ public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
+ final Collection<KeyValue<K, V>> records,
+ final Properties producerConfig,
+ final Long timestamp)
+ throws ExecutionException, InterruptedException {
+ try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+ producer.initTransactions();
+ for (final KeyValue<K, V> record : records) {
+ producer.beginTransaction();
+ final Future<RecordMetadata> f = producer
+ .send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+ f.get();
+ producer.abortTransaction();
+ }
+ }
+ }
- public static <V> void produceValuesSynchronously(
- final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
+ public static <V> void produceValuesSynchronously(final String topic,
+ final Collection<V> records,
+ final Properties producerConfig,
+ final Time time)
throws ExecutionException, InterruptedException {
IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
}
- public static <V> void produceValuesSynchronously(
- final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
- throws ExecutionException, InterruptedException {
+ public static <V> void produceValuesSynchronously(final String topic,
+ final Collection<V> records,
+ final Properties producerConfig,
+ final Time time,
+ final boolean enableTransactions)
+ throws ExecutionException, InterruptedException {
final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
for (final V value : records) {
final KeyValue<Object, V> kv = new KeyValue<>(null, value);
@@ -161,10 +183,9 @@ public class IntegrationTestUtils {
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
-
return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
}
-
+
/**
* Wait until enough data (key-value records) has been consumed.
*
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.