You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/12 20:32:29 UTC

[kafka] branch 0.11.0 updated: KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 9bf277b  KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)
9bf277b is described below

commit 9bf277bc1a830b1f1110dc02e94f720bf7ed293a
Author: Gitomain <li...@gmail.com>
AuthorDate: Tue Jun 12 20:54:07 2018 +0200

    KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable (#4900)
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .gitignore                                         |   1 +
 kafka                                              |   1 +
 .../internals/GlobalStateManagerImpl.java          |   2 +-
 ...st.java => GlobalKTableEOSIntegrationTest.java} | 121 ++++++++++++++-------
 .../integration/GlobalKTableIntegrationTest.java   |  60 ++--------
 .../integration/utils/IntegrationTestUtils.java    |  35 ++++--
 .../processor/internals/StreamTaskTest.java        |  10 ++
 7 files changed, 135 insertions(+), 95 deletions(-)

diff --git a/.gitignore b/.gitignore
index e8503ff..59adf08 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 dist
 *classes
+*.class
 target/
 build/
 build_eclipse/
diff --git a/kafka b/kafka
new file mode 160000
index 0000000..cc43e77
--- /dev/null
+++ b/kafka
@@ -0,0 +1 @@
+Subproject commit cc43e77bbbfad71883011186de55603c936cbcd1
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index b7ff7ed..1e2e5ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -176,8 +176,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                     if (record.key() != null) {
                         stateRestoreCallback.restore(record.key(), record.value());
                     }
-                    offset = consumer.position(topicPartition);
                 }
+                offset = consumer.position(topicPartition);
             }
             checkpointableOffsets.put(topicPartition, offset);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
similarity index 76%
copy from streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
copy to streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 7ee2d4b..e5a4c6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -54,7 +53,7 @@ import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
-public class GlobalKTableIntegrationTest {
+public class GlobalKTableEOSIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final Properties BROKER_CONFIG;
     static {
@@ -81,17 +80,15 @@ public class GlobalKTableIntegrationTest {
             return value1 + "+" + value2;
         }
     };
+    private final String globalStore = "globalStore";
+    private final Map<String, String> results = new HashMap<>();
     private KStreamBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
-    private String globalOne;
-    private String inputStream;
-    private String inputTable;
-    private final String globalStore = "globalStore";
+    private String globalTableTopic;
+    private String streamTopic;
     private GlobalKTable<Long, String> globalTable;
     private KStream<String, Long> stream;
-    private KTable<String, Long> table;
-    final Map<String, String> results = new HashMap<>();
     private ForeachAction<String, String> foreachAction;
 
     @Before
@@ -100,18 +97,17 @@ public class GlobalKTableIntegrationTest {
         builder = new KStreamBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalOne-table-test-" + testNo;
+        final String applicationId = "globalTableTopic-table-eos-test-" + testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-                .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
-        stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
-        table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalTableTopic, globalStore);
+        stream = builder.stream(Serdes.String(), Serdes.Long(), streamTopic);
         foreachAction = new ForeachAction<String, String>() {
             @Override
             public void apply(final String key, final String value) {
@@ -134,7 +130,7 @@ public class GlobalKTableIntegrationTest {
         streamTableJoin.foreach(foreachAction);
         produceInitialGlobalTableValues();
         startStreams();
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         final Map<String, String> expected = new HashMap<>();
         expected.put("a", "1+A");
@@ -161,7 +157,7 @@ public class GlobalKTableIntegrationTest {
                 return "J".equals(replicatedStore.get(5L));
             }
         }, 30000, "waiting for data in replicated store");
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         expected.put("a", "1+F");
         expected.put("b", "2+G");
@@ -183,7 +179,7 @@ public class GlobalKTableIntegrationTest {
         streamTableJoin.foreach(foreachAction);
         produceInitialGlobalTableValues();
         startStreams();
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         final Map<String, String> expected = new HashMap<>();
         expected.put("a", "1+A");
@@ -210,7 +206,7 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000, "waiting for data in replicated store");
 
-        produceTopicValues(inputStream);
+        produceTopicValues(streamTopic);
 
         expected.put("a", "1+F");
         expected.put("b", "2+G");
@@ -228,7 +224,8 @@ public class GlobalKTableIntegrationTest {
 
     @Test
     public void shouldRestoreTransactionalMessages() throws Exception {
-        produceInitialGlobalTableValues(true);
+        produceInitialGlobalTableValues();
+
         startStreams();
 
         final Map<Long, String> expected = new HashMap<>();
@@ -255,23 +252,55 @@ public class GlobalKTableIntegrationTest {
                 return result.equals(expected);
             }
         }, 30000L, "waiting for initial values");
-        System.out.println("no failed test");
     }
+    
+    @Test
+    public void shouldNotRestoreAbortedMessages() throws Exception {
+        produceAbortedMessages();
+        produceInitialGlobalTableValues();
+        produceAbortedMessages();
 
-    private void createTopics() throws InterruptedException {
-        inputStream = "input-stream-" + testNo;
-        inputTable = "input-table-" + testNo;
-        globalOne = "globalOne-" + testNo;
-        CLUSTER.createTopics(inputStream, inputTable);
-        CLUSTER.createTopic(globalOne, 2, 1);
+        startStreams();
+        
+        final Map<Long, String> expected = new HashMap<>();
+        expected.put(1L, "A");
+        expected.put(2L, "B");
+        expected.put(3L, "C");
+        expected.put(4L, "D");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                ReadOnlyKeyValueStore<Long, String> store = null;
+                try {
+                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+                } catch (InvalidStateStoreException ex) {
+                    return false;
+                }
+                Map<Long, String> result = new HashMap<>();
+                Iterator<KeyValue<Long, String>> it = store.all();
+                while (it.hasNext()) {
+                    KeyValue<Long, String> kv = it.next();
+                    result.put(kv.key, kv.value);
+                }
+                return result.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
     }
 
+    private void createTopics() throws InterruptedException {
+        streamTopic = "stream-" + testNo;
+        globalTableTopic = "globalTable-" + testNo;
+        CLUSTER.createTopics(streamTopic);
+        CLUSTER.createTopic(globalTableTopic, 2, 1);
+    }
+    
     private void startStreams() {
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
         kafkaStreams.start();
     }
 
-    private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceTopicValues(final String topic) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 topic,
                 Arrays.asList(
@@ -288,23 +317,43 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
-        produceInitialGlobalTableValues(false);
+    private void produceAbortedMessages() throws Exception {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+        properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+        IntegrationTestUtils.produceAbortedKeyValuesSynchronouslyWithTimestamp(
+                globalTableTopic, Arrays.asList(
+                        new KeyValue<>(1L, "A"),
+                        new KeyValue<>(2L, "B"),
+                        new KeyValue<>(3L, "C"),
+                        new KeyValue<>(4L, "D")
+                        ), 
+                TestUtils.producerConfig(
+                                CLUSTER.bootstrapServers(),
+                                LongSerializer.class,
+                                StringSerializer.class,
+                                properties),
+                mockTime.milliseconds());
     }
 
-    private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
-        Properties properties = new Properties();
+    private void produceInitialGlobalTableValues() throws Exception {
+        produceInitialGlobalTableValues(true);
+    }
+
+    private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
+        final Properties properties = new Properties();
         if (enableTransactions) {
             properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
             properties.put(ProducerConfig.RETRIES_CONFIG, 1);
         }
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                globalOne,
+                globalTableTopic,
                 Arrays.asList(
                         new KeyValue<>(1L, "A"),
                         new KeyValue<>(2L, "B"),
                         new KeyValue<>(3L, "C"),
-                        new KeyValue<>(4L, "D")),
+                        new KeyValue<>(4L, "D")
+                        ),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
@@ -314,9 +363,9 @@ public class GlobalKTableIntegrationTest {
                 enableTransactions);
     }
 
-    private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceGlobalTableValues() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                globalOne,
+                globalTableTopic,
                 Arrays.asList(
                         new KeyValue<>(1L, "F"),
                         new KeyValue<>(2L, "G"),
@@ -330,6 +379,4 @@ public class GlobalKTableIntegrationTest {
                         new Properties()),
                 mockTime);
     }
-
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 7ee2d4b..9a849f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -49,23 +48,16 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
-    private static final Properties BROKER_CONFIG;
-    static {
-        BROKER_CONFIG = new Properties();
-        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
-        BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
-    }
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+            new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
@@ -225,39 +217,7 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000L, "waiting for final values");
     }
-
-    @Test
-    public void shouldRestoreTransactionalMessages() throws Exception {
-        produceInitialGlobalTableValues(true);
-        startStreams();
-
-        final Map<Long, String> expected = new HashMap<>();
-        expected.put(1L, "A");
-        expected.put(2L, "B");
-        expected.put(3L, "C");
-        expected.put(4L, "D");
-
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                ReadOnlyKeyValueStore<Long, String> store = null;
-                try {
-                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
-                } catch (InvalidStateStoreException ex) {
-                    return false;
-                }
-                Map<Long, String> result = new HashMap<>();
-                Iterator<KeyValue<Long, String>> it = store.all();
-                while (it.hasNext()) {
-                    KeyValue<Long, String> kv = it.next();
-                    result.put(kv.key, kv.value);
-                }
-                return result.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
-        System.out.println("no failed test");
-    }
-
+    
     private void createTopics() throws InterruptedException {
         inputStream = "input-stream-" + testNo;
         inputTable = "input-table-" + testNo;
@@ -265,7 +225,7 @@ public class GlobalKTableIntegrationTest {
         CLUSTER.createTopics(inputStream, inputTable);
         CLUSTER.createTopic(globalOne, 2, 1);
     }
-
+    
     private void startStreams() {
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
         kafkaStreams.start();
@@ -288,11 +248,11 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceInitialGlobalTableValues() throws Exception {
         produceInitialGlobalTableValues(false);
     }
 
-    private void produceInitialGlobalTableValues(final boolean enableTransactions) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceInitialGlobalTableValues(final boolean enableTransactions) throws Exception {
         Properties properties = new Properties();
         if (enableTransactions) {
             properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
@@ -304,14 +264,14 @@ public class GlobalKTableIntegrationTest {
                         new KeyValue<>(1L, "A"),
                         new KeyValue<>(2L, "B"),
                         new KeyValue<>(3L, "C"),
-                        new KeyValue<>(4L, "D")),
+                        new KeyValue<>(4L, "D")
+                        ),
                 TestUtils.producerConfig(
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
-                        StringSerializer.class,
-                        properties),
-                mockTime,
-                enableTransactions);
+                        StringSerializer.class
+                        ),
+                mockTime);
     }
 
     private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index b911ebf..bb20686 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -140,16 +140,38 @@ public class IntegrationTestUtils {
             producer.flush();
         }
     }
+    
+    public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
+                                                                                final Collection<KeyValue<K, V>> records,
+                                                                                final Properties producerConfig,
+                                                                                final Long timestamp)
+        throws ExecutionException, InterruptedException {
+        try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+            producer.initTransactions();
+            for (final KeyValue<K, V> record : records) {
+                producer.beginTransaction();
+                final Future<RecordMetadata> f = producer
+                        .send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
+                f.get();
+                producer.abortTransaction();
+            }
+        }    
+    }
 
-    public static <V> void produceValuesSynchronously(
-        final String topic, final Collection<V> records, final Properties producerConfig, final Time time)
+    public static <V> void produceValuesSynchronously(final String topic,
+                                                      final Collection<V> records,
+                                                      final Properties producerConfig,
+                                                      final Time time)
         throws ExecutionException, InterruptedException {
         IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time, false);
     }
 
-    public static <V> void produceValuesSynchronously(
-        final String topic, final Collection<V> records, final Properties producerConfig, final Time time, final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
+    public static <V> void produceValuesSynchronously(final String topic,
+                                                      final Collection<V> records,
+                                                      final Properties producerConfig,
+                                                      final Time time,
+                                                      final boolean enableTransactions)
+            throws ExecutionException, InterruptedException {
         final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
         for (final V value : records) {
             final KeyValue<Object, V> kv = new KeyValue<>(null, value);
@@ -161,10 +183,9 @@ public class IntegrationTestUtils {
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                   final String topic,
                                                                                   final int expectedNumRecords) throws InterruptedException {
-
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
     }
-
+    
     /**
      * Wait until enough data (key-value records) has been consumed.
      *
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0ba3f1d..4bfa0a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -636,6 +636,16 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.close(false, false);
+        task = null;
+    }
+
+    @Test
     public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.