You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/09 23:30:58 UTC

kafka git commit: KAFKA-6190: Use consumer.position() instead of record.offset() to advance in GlobalKTable restoration to avoid transactional control messages

Repository: kafka
Updated Branches:
  refs/heads/0.11.0 1a0c00698 -> 1321d8948


KAFKA-6190: Use consumer.position() instead of record.offset() to advance in GlobalKTable restoration to avoid transactional control messages

Calculate offset using consumer.position() in GlobalStateManagerImp#restoreState

Author: Alex Good <al...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #4197 from alexjg/0.11.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1321d894
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1321d894
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1321d894

Branch: refs/heads/0.11.0
Commit: 1321d89484a9a0657620b20c08ce96ee43d8a691
Parents: 1a0c006
Author: Alex Good <al...@gmail.com>
Authored: Thu Nov 9 15:30:53 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 9 15:30:53 2017 -0800

----------------------------------------------------------------------
 .../internals/GlobalStateManagerImpl.java       |  2 +-
 .../GlobalKTableIntegrationTest.java            | 56 ++++++++++++++++++--
 .../integration/utils/IntegrationTestUtils.java | 41 +++++++++++++-
 3 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1321d894/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 6bd699f..b7ff7ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -173,10 +173,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
             while (offset < highWatermark) {
                 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                 for (ConsumerRecord<byte[], byte[]> record : records) {
-                    offset = record.offset() + 1;
                     if (record.key() != null) {
                         stateRestoreCallback.restore(record.key(), record.value());
                     }
+                    offset = consumer.position(topicPartition);
                 }
             }
             checkpointableOffsets.put(topicPartition, offset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1321d894/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 869c255..7ee2d4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.integration;
 
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -47,16 +49,23 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final Properties BROKER_CONFIG;
+    static {
+        BROKER_CONFIG = new Properties();
+        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
+        BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+    }
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS);
+            new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
@@ -217,6 +226,37 @@ public class GlobalKTableIntegrationTest {
         }, 30000L, "waiting for final values");
     }
 
+    @Test
+    public void shouldRestoreTransactionalMessages() throws Exception {
+        produceInitialGlobalTableValues(true);
+        startStreams();
+
+        final Map<Long, String> expected = new HashMap<>();
+        expected.put(1L, "A");
+        expected.put(2L, "B");
+        expected.put(3L, "C");
+        expected.put(4L, "D");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                ReadOnlyKeyValueStore<Long, String> store = null;
+                try {
+                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+                } catch (InvalidStateStoreException ex) {
+                    return false;
+                }
+                Map<Long, String> result = new HashMap<>();
+                Iterator<KeyValue<Long, String>> it = store.all();
+                while (it.hasNext()) {
+                    KeyValue<Long, String> kv = it.next();
+                    result.put(kv.key, kv.value);
+                }
+                return result.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
+        System.out.println("no failed test");
+    }
 
     private void createTopics() throws InterruptedException {
         inputStream = "input-stream-" + testNo;
@@ -249,6 +289,15 @@ public class GlobalKTableIntegrationTest {
     }
 
     private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+        produceInitialGlobalTableValues(false);
+    }
+
+    private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
+        Properties properties = new Properties();
+        if (enableTransactions) {
+            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+            properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+        }
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 globalOne,
                 Arrays.asList(
@@ -260,8 +309,9 @@ public class GlobalKTableIntegrationTest {
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
                         StringSerializer.class,
-                        new Properties()),
-                mockTime);
+                        properties),
+                mockTime,
+                enableTransactions);
     }
 
     private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1321d894/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 00dd542..b911ebf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -86,11 +86,26 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronously(
         final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, time, false);
+    }
+
+    /**
+     * @param topic               Kafka topic to write the data records to
+     * @param records             Data records to write to Kafka
+     * @param producerConfig      Kafka producer configuration
+     * @param enableTransactions  Send messages in a transaction
+     * @param <K>                 Key type of the data records
+     * @param <V>                 Value type of the data records
+     */
+    public static <K, V> void produceKeyValuesSynchronously(
+        final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         for (final KeyValue<K, V> record : records) {
             produceKeyValuesSynchronouslyWithTimestamp(topic,
                 Collections.singleton(record),
                 producerConfig,
-                time.milliseconds());
+                time.milliseconds(),
+                enableTransactions);
             time.sleep(1L);
         }
     }
@@ -100,12 +115,28 @@ public class IntegrationTestUtils {
                                                                          final Properties producerConfig,
                                                                          final Long timestamp)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                         final Collection<KeyValue<K, V>> records,
+                                                                         final Properties producerConfig,
+                                                                         final Long timestamp,
+                                                                         final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+            if (enableTransactions) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
             for (final KeyValue<K, V> record : records) {
                 final Future<RecordMetadata> f = producer.send(
                     new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
                 f.get();
             }
+            if (enableTransactions) {
+                producer.commitTransaction();
+            }
             producer.flush();
         }
     }
@@ -113,12 +144,18 @@ public class IntegrationTestUtils {
     public static <V> void produceValuesSynchronously(
         final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
+    }
+
+    public static <V> void produceValuesSynchronously(
+        final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
         for (final V value : records) {
             final KeyValue<Object, V> kv = new KeyValue<>(null, value);
             keyedRecords.add(kv);
         }
-        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time);
+        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
     }
 
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,