You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/09 23:30:58 UTC
kafka git commit: KAFKA-6190: Use consumer.position() instead of
record.offset() to advance in GlobalKTable restoration to avoid transactional
control messages
Repository: kafka
Updated Branches:
refs/heads/0.11.0 1a0c00698 -> 1321d8948
KAFKA-6190: Use consumer.position() instead of record.offset() to advance in GlobalKTable restoration to avoid transactional control messages
Calculate offset using consumer.position() in GlobalStateManagerImp#restoreState
Author: Alex Good <al...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #4197 from alexjg/0.11.0
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1321d894
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1321d894
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1321d894
Branch: refs/heads/0.11.0
Commit: 1321d89484a9a0657620b20c08ce96ee43d8a691
Parents: 1a0c006
Author: Alex Good <al...@gmail.com>
Authored: Thu Nov 9 15:30:53 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 9 15:30:53 2017 -0800
----------------------------------------------------------------------
.../internals/GlobalStateManagerImpl.java | 2 +-
.../GlobalKTableIntegrationTest.java | 56 ++++++++++++++++++--
.../integration/utils/IntegrationTestUtils.java | 41 +++++++++++++-
3 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1321d894/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 6bd699f..b7ff7ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -173,10 +173,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
while (offset < highWatermark) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
- offset = record.offset() + 1;
if (record.key() != null) {
stateRestoreCallback.restore(record.key(), record.value());
}
+ offset = consumer.position(topicPartition);
}
}
checkpointableOffsets.put(topicPartition, offset);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1321d894/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 869c255..7ee2d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -47,16 +49,23 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
+ private static final Properties BROKER_CONFIG;
+ static {
+ BROKER_CONFIG = new Properties();
+ BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
+ BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+ }
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
@@ -217,6 +226,37 @@ public class GlobalKTableIntegrationTest {
}, 30000L, "waiting for final values");
}
+ @Test
+ public void shouldRestoreTransactionalMessages() throws Exception {
+ produceInitialGlobalTableValues(true);
+ startStreams();
+
+ final Map<Long, String> expected = new HashMap<>();
+ expected.put(1L, "A");
+ expected.put(2L, "B");
+ expected.put(3L, "C");
+ expected.put(4L, "D");
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ ReadOnlyKeyValueStore<Long, String> store = null;
+ try {
+ store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+ } catch (InvalidStateStoreException ex) {
+ return false;
+ }
+ Map<Long, String> result = new HashMap<>();
+ Iterator<KeyValue<Long, String>> it = store.all();
+ while (it.hasNext()) {
+ KeyValue<Long, String> kv = it.next();
+ result.put(kv.key, kv.value);
+ }
+ return result.equals(expected);
+ }
+ }, 30000L, "waiting for initial values");
+ System.out.println("no failed test");
+ }
private void createTopics() throws InterruptedException {
inputStream = "input-stream-" + testNo;
@@ -249,6 +289,15 @@ public class GlobalKTableIntegrationTest {
}
private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+ produceInitialGlobalTableValues(false);
+ }
+
+ private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
+ Properties properties = new Properties();
+ if (enableTransactions) {
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+ properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+ }
IntegrationTestUtils.produceKeyValuesSynchronously(
globalOne,
Arrays.asList(
@@ -260,8 +309,9 @@ public class GlobalKTableIntegrationTest {
CLUSTER.bootstrapServers(),
LongSerializer.class,
StringSerializer.class,
- new Properties()),
- mockTime);
+ properties),
+ mockTime,
+ enableTransactions);
}
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1321d894/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 00dd542..b911ebf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -86,11 +86,26 @@ public class IntegrationTestUtils {
public static <K, V> void produceKeyValuesSynchronously(
final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time)
throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
+ }
+
+ /**
+ * @param topic Kafka topic to write the data records to
+ * @param records Data records to write to Kafka
+ * @param producerConfig Kafka producer configuration
+ * @param enableTransactions Send messages in a transaction
+ * @param <K> Key type of the data records
+ * @param <V> Value type of the data records
+ */
+ public static <K, V> void produceKeyValuesSynchronously(
+ final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
+ throws ExecutionException, InterruptedException {
for (final KeyValue<K, V> record : records) {
produceKeyValuesSynchronouslyWithTimestamp(topic,
Collections.singleton(record),
producerConfig,
- time.milliseconds());
+ time.milliseconds(),
+ enableTransactions);
time.sleep(1L);
}
}
@@ -100,12 +115,28 @@ public class IntegrationTestUtils {
final Properties producerConfig,
final Long timestamp)
throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
+ }
+
+ public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+ final Collection<KeyValue<K, V>> records,
+ final Properties producerConfig,
+ final Long timestamp,
+ final boolean enableTransactions)
+ throws ExecutionException, InterruptedException {
try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+ if (enableTransactions) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ }
for (final KeyValue<K, V> record : records) {
final Future<RecordMetadata> f = producer.send(
new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
f.get();
}
+ if (enableTransactions) {
+ producer.commitTransaction();
+ }
producer.flush();
}
}
@@ -113,12 +144,18 @@ public class IntegrationTestUtils {
public static <V> void produceValuesSynchronously(
final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
throws ExecutionException, InterruptedException {
+ IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
+ }
+
+ public static <V> void produceValuesSynchronously(
+ final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
+ throws ExecutionException, InterruptedException {
final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
for (final V value : records) {
final KeyValue<Object, V> kv = new KeyValue<>(null, value);
keyedRecords.add(kv);
}
- produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time);
+ produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
}
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,