You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/04/10 18:55:28 UTC

[kafka] branch trunk updated: KAFKA-9832: Extend Streams system tests for EOS-beta (#8443)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20e4a74  KAFKA-9832: Extend Streams system tests for EOS-beta (#8443)
20e4a74 is described below

commit 20e4a74c3579837df7b46ce1a5dcea37b6e1e452
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 10 11:55:01 2020 -0700

    KAFKA-9832: Extend Streams system tests for EOS-beta (#8443)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/ActiveTaskCreator.java     |  3 ---
 .../processor/internals/StreamsProducer.java       |  4 +++-
 .../processor/internals/StreamsProducerTest.java   | 18 ++++++++--------
 .../kafka/streams/tests/RelationalSmokeTest.java   | 11 ++++++----
 .../streams/tests/RelationalSmokeTestTest.java     |  3 +++
 .../kafka/streams/tests/StreamsSmokeTest.java      | 18 +++++++++++-----
 tests/kafkatest/services/streams.py                | 13 +++++-------
 tests/kafkatest/services/streams_property.py       |  1 +
 .../tests/streams/streams_relational_smoke_test.py | 21 +++++++++++--------
 .../kafkatest/tests/streams/streams_smoke_test.py  | 24 +++++++---------------
 10 files changed, 60 insertions(+), 56 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index ad1a3cc..5473d7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -62,7 +62,6 @@ class ActiveTaskCreator {
     private final StreamsProducer threadProducer;
     private final Map<TaskId, StreamsProducer> taskProducers;
     private final StreamThread.ProcessingMode processingMode;
-    private final String transactionalId;
 
     ActiveTaskCreator(final InternalTopologyBuilder builder,
                       final StreamsConfig config,
@@ -87,8 +86,6 @@ class ActiveTaskCreator {
         this.log = log;
 
         createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
-        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        transactionalId = applicationId + "-" + processId + "-StreamThread-" + threadId.split("-StreamThread-")[1];
         processingMode = StreamThread.processingMode(config);
 
         if (processingMode == EXACTLY_ONCE_ALPHA) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 5f3c386..c39ff20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -113,7 +113,9 @@ public class StreamsProducer {
                 final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
                 producerConfigs.put(
                     ProducerConfig.TRANSACTIONAL_ID_CONFIG,
-                    applicationId + "-" + Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta"));
+                    applicationId + "-" +
+                        Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta") +
+                        "-" + threadId.split("-StreamThread-")[1]);
 
                 eosBetaProducerConfigs = producerConfigs;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index 12779d4..c46a4c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -142,7 +142,7 @@ public class StreamsProducerTest {
         nonEosStreamsProducer =
             new StreamsProducer(
                 nonEosConfig,
-                "threadId",
+                "threadId-StreamThread-0",
                 mockClientSupplier,
                 null,
                 null,
@@ -155,7 +155,7 @@ public class StreamsProducerTest {
         eosAlphaStreamsProducer =
             new StreamsProducer(
                 eosAlphaConfig,
-                "threadId",
+                "threadId-StreamThread-0",
                 eosAlphaMockClientSupplier,
                 new TaskId(0, 0),
                 null,
@@ -169,7 +169,7 @@ public class StreamsProducerTest {
         eosBetaStreamsProducer =
             new StreamsProducer(
                 eosBetaConfig,
-                "threadId",
+                "threadId-StreamThread-0",
                 eosBetaMockClientSupplier,
                 null,
                 UUID.randomUUID(),
@@ -470,11 +470,11 @@ public class StreamsProducerTest {
         final UUID processId = UUID.randomUUID();
 
         final Map<String, Object> mockMap = mock(Map.class);
-        expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId)).andReturn(null);
+        expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId + "-0")).andReturn(null);
         expect(mockMap.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn("appId-" + processId);
 
         final StreamsConfig mockConfig = mock(StreamsConfig.class);
-        expect(mockConfig.getProducerConfigs("threadId-producer")).andReturn(mockMap);
+        expect(mockConfig.getProducerConfigs("threadId-StreamThread-0-producer")).andReturn(mockMap);
         expect(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("appId");
         expect(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).andReturn(StreamsConfig.EXACTLY_ONCE_BETA).anyTimes();
 
@@ -482,7 +482,7 @@ public class StreamsProducerTest {
 
         new StreamsProducer(
             mockConfig,
-            "threadId",
+            "threadId-StreamThread-0",
             eosAlphaMockClientSupplier,
             null,
             processId,
@@ -605,7 +605,7 @@ public class StreamsProducerTest {
 
         final StreamsProducer streamsProducer = new StreamsProducer(
             eosBetaConfig,
-            "threadId",
+            "threadId-StreamThread-0",
             clientSupplier,
             null,
             UUID.randomUUID(),
@@ -737,7 +737,7 @@ public class StreamsProducerTest {
         final StreamsProducer streamsProducer =
             new StreamsProducer(
                 eosBetaConfig,
-                "threadId",
+                "threadId-StreamThread-0",
                 eosBetaMockClientSupplier,
                 null,
                 UUID.randomUUID(),
@@ -1076,7 +1076,7 @@ public class StreamsProducerTest {
     public void shouldResetTransactionInitializedOnResetProducer() {
         final StreamsProducer streamsProducer = new StreamsProducer(
             eosBetaConfig,
-            "threadId",
+            "threadId-StreamThread-0",
             clientSupplier,
             null,
             UUID.randomUUID(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
index b6dd6e1..bd2ebc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
@@ -637,13 +637,14 @@ public class RelationalSmokeTest extends SmokeTestUtil {
         public static Properties getConfig(final String broker,
                                            final String application,
                                            final String id,
+                                           final String processingGuarantee,
                                            final String stateDir) {
             return mkProperties(
                 mkMap(
                     mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker),
                     mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application),
                     mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id),
-                    mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE),
+                    mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee),
                     mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"),
                     mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
                     mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir)
@@ -654,9 +655,10 @@ public class RelationalSmokeTest extends SmokeTestUtil {
         public static KafkaStreams startSync(final String broker,
                                              final String application,
                                              final String id,
+                                             final String processingGuarantee,
                                              final String stateDir) throws InterruptedException {
             final KafkaStreams kafkaStreams =
-                new KafkaStreams(getTopology(), getConfig(broker, application, id, stateDir));
+                new KafkaStreams(getTopology(), getConfig(broker, application, id, processingGuarantee, stateDir));
             final CountDownLatch startUpLatch = new CountDownLatch(1);
             kafkaStreams.setStateListener((newState, oldState) -> {
                 if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
@@ -983,8 +985,9 @@ public class RelationalSmokeTest extends SmokeTestUtil {
                 }
                 case "application": {
                     final String nodeId = args[2];
-                    final String stateDir = args[3];
-                    App.startSync(kafka, UUID.randomUUID().toString(), nodeId, stateDir);
+                    final String processingGuarantee = args[3];
+                    final String stateDir = args[4];
+                    App.startSync(kafka, UUID.randomUUID().toString(), nodeId, processingGuarantee, stateDir);
                     break;
                 }
                 default:
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java
index ea65f4a..a8f1186 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
@@ -31,6 +32,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 public class RelationalSmokeTestTest extends SmokeTestUtil {
+
     @Test
     public void verifySmokeTestLogic() {
         try (final TopologyTestDriver driver =
@@ -39,6 +41,7 @@ public class RelationalSmokeTestTest extends SmokeTestUtil {
                                             "nothing:0",
                                             "test",
                                             "test",
+                                            StreamsConfig.AT_LEAST_ONCE,
                                             TestUtils.tempDirectory().getAbsolutePath()
                                         ))) {
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index ea60676..933fb7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -49,12 +49,25 @@ public class StreamsSmokeTest {
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
         final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
 
         if (kafka == null) {
             System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
             System.exit(1);
         }
 
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + ", " +
+                    StreamsConfig.EXACTLY_ONCE + ", or " + StreamsConfig.EXACTLY_ONCE_BETA);
+
+                System.exit(1);
+            }
+        }
+
         System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
         System.out.println("props=" + streamsProperties);
@@ -79,11 +92,6 @@ public class StreamsSmokeTest {
                 // this starts the stream processing app
                 new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
                 break;
-            case "process-eos":
-                // this starts the stream processing app with EOS
-                streamsProperties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-                new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
-                break;
             case "close-deadlock-test":
                 final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);
                 test.start();
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 2619eb7..59f24a0 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -302,16 +302,18 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
 class StreamsSmokeTestBaseService(StreamsTestBaseService):
     """Base class for Streams Smoke Test services providing some common settings and functionality"""
 
-    def __init__(self, test_context, kafka, command, num_threads = 3):
+    def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
         super(StreamsSmokeTestBaseService, self).__init__(test_context,
                                                           kafka,
                                                           "org.apache.kafka.streams.tests.StreamsSmokeTest",
                                                           command)
         self.NUM_THREADS = num_threads
+        self.PROCESSING_GUARANTEE = processing_guarantee
 
     def prop_file(self):
         properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
                       streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
+                      streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
                       streams_property.NUM_THREADS: self.NUM_THREADS}
 
         cfg = KafkaConfig(**properties)
@@ -359,13 +361,8 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
         return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
-    def __init__(self, test_context, kafka, num_threads = 3):
-        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", num_threads)
-
-class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
-    def __init__(self, test_context, kafka):
-        super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos")
-
+    def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3):
+        super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, num_threads)
 
 class StreamsEosTestDriverService(StreamsEosTestBaseService):
     def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py
index 99f0ece..8900adb 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/services/streams_property.py
@@ -20,3 +20,4 @@ Define Streams configuration property names here.
 STATE_DIR = "state.dir"
 KAFKA_SERVERS = "bootstrap.servers"
 NUM_THREADS = "num.stream.threads"
+PROCESSING_GUARANTEE = "processing.guarantee"
diff --git a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
index 2cdd480..a078f5e 100644
--- a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
@@ -22,7 +22,7 @@ from kafkatest.tests.kafka_test import KafkaTest
 
 
 class StreamsRelationalSmokeTestService(StreamsTestBaseService):
-    def __init__(self, test_context, kafka, mode, nodeId):
+    def __init__(self, test_context, kafka, mode, nodeId, processing_guarantee):
         super(StreamsRelationalSmokeTestService, self).__init__(
             test_context,
             kafka,
@@ -31,18 +31,20 @@ class StreamsRelationalSmokeTestService(StreamsTestBaseService):
         )
         self.mode = mode
         self.nodeId = nodeId
+        self.processing_guarantee = processing_guarantee
         self.log4j_template = 'log4j_template.properties'
 
     def start_cmd(self, node):
         return "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
                "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.tests.RelationalSmokeTest " \
-               " %(mode)s %(kafka)s %(nodeId)s %(state_dir)s" \
+               " %(mode)s %(kafka)s %(nodeId)s %(processing_guarantee)s %(state_dir)s" \
                " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % {
                    "log4j": self.LOG4J_CONFIG_FILE,
                    "kafka_run_class": self.path.script("kafka-run-class.sh", node),
                    "mode": self.mode,
-                   "nodeId": self.nodeId,
                    "kafka": self.kafka.bootstrap_servers(),
+                   "nodeId": self.nodeId,
+                   "processing_guarantee": self.processing_guarantee,
                    "state_dir": self.PERSISTENT_ROOT,
                    "stdout": self.STDOUT_FILE,
                    "stderr": self.STDERR_FILE,
@@ -82,14 +84,15 @@ class StreamsRelationalSmokeTest(KafkaTest):
         self.test_context = test_context
 
     @cluster(num_nodes=8)
-    @matrix(crash=[False, True])
-    def test_streams(self, crash):
-        driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored")
+    @matrix(crash=[False, True],
+            processing_guarantee=['exactly_once', 'exactly_once_beta'])
+    def test_streams(self, crash, processing_guarantee):
+        driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
 
         LOG_FILE = driver.LOG_FILE  # this is the same for all instaces of the service, so we can just declare a "constant"
 
-        processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1")
-        processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2")
+        processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1", processing_guarantee)
+        processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2", processing_guarantee)
 
         processor1.start()
         processor2.start()
@@ -104,7 +107,7 @@ class StreamsRelationalSmokeTest(KafkaTest):
 
         processor1.stop_nodes(not crash)
 
-        processor3 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor3")
+        processor3 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor3", processing_guarantee)
         processor3.start()
         processor3.await_command("grep -q 'Streams has started' %s" % LOG_FILE)
 
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 094869b..1a4f296 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -13,13 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsSmokeTestEOSJobRunnerService
-
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
 
 class StreamsSmokeTest(KafkaTest):
     """
@@ -48,19 +46,11 @@ class StreamsSmokeTest(KafkaTest):
         self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
 
     @cluster(num_nodes=8)
-    @matrix(eos=[True, False], crash=[True, False])
-    def test_streams(self, eos, crash):
-        #
-        if eos:
-            processor1 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
-            processor2 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
-            processor3 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
-        else:
-            processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-            processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-            processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-
-
+    @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False])
+    def test_streams(self, processing_guarantee, crash):
+        processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
+        processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
+        processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
 
         with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as monitor1:
             processor1.start()
@@ -114,7 +104,7 @@ class StreamsSmokeTest(KafkaTest):
 
         processor3.stop()
 
-        if crash and not eos:
+        if crash and processing_guarantee == 'at_least_once':
             self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
         else:
             self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)