You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/04/10 18:55:28 UTC
[kafka] branch trunk updated: KAFKA-9832: Extend Streams system
tests for EOS-beta (#8443)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 20e4a74 KAFKA-9832: Extend Streams system tests for EOS-beta (#8443)
20e4a74 is described below
commit 20e4a74c3579837df7b46ce1a5dcea37b6e1e452
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Apr 10 11:55:01 2020 -0700
KAFKA-9832: Extend Streams system tests for EOS-beta (#8443)
Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/ActiveTaskCreator.java | 3 ---
.../processor/internals/StreamsProducer.java | 4 +++-
.../processor/internals/StreamsProducerTest.java | 18 ++++++++--------
.../kafka/streams/tests/RelationalSmokeTest.java | 11 ++++++----
.../streams/tests/RelationalSmokeTestTest.java | 3 +++
.../kafka/streams/tests/StreamsSmokeTest.java | 18 +++++++++++-----
tests/kafkatest/services/streams.py | 13 +++++-------
tests/kafkatest/services/streams_property.py | 1 +
.../tests/streams/streams_relational_smoke_test.py | 21 +++++++++++--------
.../kafkatest/tests/streams/streams_smoke_test.py | 24 +++++++---------------
10 files changed, 60 insertions(+), 56 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index ad1a3cc..5473d7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -62,7 +62,6 @@ class ActiveTaskCreator {
private final StreamsProducer threadProducer;
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;
- private final String transactionalId;
ActiveTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
@@ -87,8 +86,6 @@ class ActiveTaskCreator {
this.log = log;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
- final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- transactionalId = applicationId + "-" + processId + "-StreamThread-" + threadId.split("-StreamThread-")[1];
processingMode = StreamThread.processingMode(config);
if (processingMode == EXACTLY_ONCE_ALPHA) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 5f3c386..c39ff20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -113,7 +113,9 @@ public class StreamsProducer {
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
producerConfigs.put(
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
- applicationId + "-" + Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta"));
+ applicationId + "-" +
+ Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta") +
+ "-" + threadId.split("-StreamThread-")[1]);
eosBetaProducerConfigs = producerConfigs;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index 12779d4..c46a4c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -142,7 +142,7 @@ public class StreamsProducerTest {
nonEosStreamsProducer =
new StreamsProducer(
nonEosConfig,
- "threadId",
+ "threadId-StreamThread-0",
mockClientSupplier,
null,
null,
@@ -155,7 +155,7 @@ public class StreamsProducerTest {
eosAlphaStreamsProducer =
new StreamsProducer(
eosAlphaConfig,
- "threadId",
+ "threadId-StreamThread-0",
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
@@ -169,7 +169,7 @@ public class StreamsProducerTest {
eosBetaStreamsProducer =
new StreamsProducer(
eosBetaConfig,
- "threadId",
+ "threadId-StreamThread-0",
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
@@ -470,11 +470,11 @@ public class StreamsProducerTest {
final UUID processId = UUID.randomUUID();
final Map<String, Object> mockMap = mock(Map.class);
- expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId)).andReturn(null);
+ expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId + "-0")).andReturn(null);
expect(mockMap.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn("appId-" + processId);
final StreamsConfig mockConfig = mock(StreamsConfig.class);
- expect(mockConfig.getProducerConfigs("threadId-producer")).andReturn(mockMap);
+ expect(mockConfig.getProducerConfigs("threadId-StreamThread-0-producer")).andReturn(mockMap);
expect(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("appId");
expect(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).andReturn(StreamsConfig.EXACTLY_ONCE_BETA).anyTimes();
@@ -482,7 +482,7 @@ public class StreamsProducerTest {
new StreamsProducer(
mockConfig,
- "threadId",
+ "threadId-StreamThread-0",
eosAlphaMockClientSupplier,
null,
processId,
@@ -605,7 +605,7 @@ public class StreamsProducerTest {
final StreamsProducer streamsProducer = new StreamsProducer(
eosBetaConfig,
- "threadId",
+ "threadId-StreamThread-0",
clientSupplier,
null,
UUID.randomUUID(),
@@ -737,7 +737,7 @@ public class StreamsProducerTest {
final StreamsProducer streamsProducer =
new StreamsProducer(
eosBetaConfig,
- "threadId",
+ "threadId-StreamThread-0",
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
@@ -1076,7 +1076,7 @@ public class StreamsProducerTest {
public void shouldResetTransactionInitializedOnResetProducer() {
final StreamsProducer streamsProducer = new StreamsProducer(
eosBetaConfig,
- "threadId",
+ "threadId-StreamThread-0",
clientSupplier,
null,
UUID.randomUUID(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
index b6dd6e1..bd2ebc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTest.java
@@ -637,13 +637,14 @@ public class RelationalSmokeTest extends SmokeTestUtil {
public static Properties getConfig(final String broker,
final String application,
final String id,
+ final String processingGuarantee,
final String stateDir) {
return mkProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application),
mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id),
- mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"),
mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir)
@@ -654,9 +655,10 @@ public class RelationalSmokeTest extends SmokeTestUtil {
public static KafkaStreams startSync(final String broker,
final String application,
final String id,
+ final String processingGuarantee,
final String stateDir) throws InterruptedException {
final KafkaStreams kafkaStreams =
- new KafkaStreams(getTopology(), getConfig(broker, application, id, stateDir));
+ new KafkaStreams(getTopology(), getConfig(broker, application, id, processingGuarantee, stateDir));
final CountDownLatch startUpLatch = new CountDownLatch(1);
kafkaStreams.setStateListener((newState, oldState) -> {
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
@@ -983,8 +985,9 @@ public class RelationalSmokeTest extends SmokeTestUtil {
}
case "application": {
final String nodeId = args[2];
- final String stateDir = args[3];
- App.startSync(kafka, UUID.randomUUID().toString(), nodeId, stateDir);
+ final String processingGuarantee = args[3];
+ final String stateDir = args[4];
+ App.startSync(kafka, UUID.randomUUID().toString(), nodeId, processingGuarantee, stateDir);
break;
}
default:
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java
index ea65f4a..a8f1186 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/RelationalSmokeTestTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
@@ -31,6 +32,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class RelationalSmokeTestTest extends SmokeTestUtil {
+
@Test
public void verifySmokeTestLogic() {
try (final TopologyTestDriver driver =
@@ -39,6 +41,7 @@ public class RelationalSmokeTestTest extends SmokeTestUtil {
"nothing:0",
"test",
"test",
+ StreamsConfig.AT_LEAST_ONCE,
TestUtils.tempDirectory().getAbsolutePath()
))) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index ea60676..933fb7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -49,12 +49,25 @@ public class StreamsSmokeTest {
final Properties streamsProperties = Utils.loadProps(propFileName);
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+ final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
}
+ if ("process".equals(command)) {
+ if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+ !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
+ !StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) {
+
+ System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + ", " +
+ StreamsConfig.EXACTLY_ONCE + ", or " + StreamsConfig.EXACTLY_ONCE_BETA);
+
+ System.exit(1);
+ }
+ }
+
System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("props=" + streamsProperties);
@@ -79,11 +92,6 @@ public class StreamsSmokeTest {
// this starts the stream processing app
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
break;
- case "process-eos":
- // this starts the stream processing app with EOS
- streamsProperties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
- new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
- break;
case "close-deadlock-test":
final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);
test.start();
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 2619eb7..59f24a0 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -302,16 +302,18 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
class StreamsSmokeTestBaseService(StreamsTestBaseService):
"""Base class for Streams Smoke Test services providing some common settings and functionality"""
- def __init__(self, test_context, kafka, command, num_threads = 3):
+ def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
super(StreamsSmokeTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsSmokeTest",
command)
self.NUM_THREADS = num_threads
+ self.PROCESSING_GUARANTEE = processing_guarantee
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
+ streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
streams_property.NUM_THREADS: self.NUM_THREADS}
cfg = KafkaConfig(**properties)
@@ -359,13 +361,8 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
- def __init__(self, test_context, kafka, num_threads = 3):
- super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", num_threads)
-
-class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
- def __init__(self, test_context, kafka):
- super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos")
-
+ def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3):
+ super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, num_threads)
class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py
index 99f0ece..8900adb 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/services/streams_property.py
@@ -20,3 +20,4 @@ Define Streams configuration property names here.
STATE_DIR = "state.dir"
KAFKA_SERVERS = "bootstrap.servers"
NUM_THREADS = "num.stream.threads"
+PROCESSING_GUARANTEE = "processing.guarantee"
diff --git a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
index 2cdd480..a078f5e 100644
--- a/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_relational_smoke_test.py
@@ -22,7 +22,7 @@ from kafkatest.tests.kafka_test import KafkaTest
class StreamsRelationalSmokeTestService(StreamsTestBaseService):
- def __init__(self, test_context, kafka, mode, nodeId):
+ def __init__(self, test_context, kafka, mode, nodeId, processing_guarantee):
super(StreamsRelationalSmokeTestService, self).__init__(
test_context,
kafka,
@@ -31,18 +31,20 @@ class StreamsRelationalSmokeTestService(StreamsTestBaseService):
)
self.mode = mode
self.nodeId = nodeId
+ self.processing_guarantee = processing_guarantee
self.log4j_template = 'log4j_template.properties'
def start_cmd(self, node):
return "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.tests.RelationalSmokeTest " \
- " %(mode)s %(kafka)s %(nodeId)s %(state_dir)s" \
+ " %(mode)s %(kafka)s %(nodeId)s %(processing_guarantee)s %(state_dir)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % {
"log4j": self.LOG4J_CONFIG_FILE,
"kafka_run_class": self.path.script("kafka-run-class.sh", node),
"mode": self.mode,
- "nodeId": self.nodeId,
"kafka": self.kafka.bootstrap_servers(),
+ "nodeId": self.nodeId,
+ "processing_guarantee": self.processing_guarantee,
"state_dir": self.PERSISTENT_ROOT,
"stdout": self.STDOUT_FILE,
"stderr": self.STDERR_FILE,
@@ -82,14 +84,15 @@ class StreamsRelationalSmokeTest(KafkaTest):
self.test_context = test_context
@cluster(num_nodes=8)
- @matrix(crash=[False, True])
- def test_streams(self, crash):
- driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored")
+ @matrix(crash=[False, True],
+ processing_guarantee=['exactly_once', 'exactly_once_beta'])
+ def test_streams(self, crash, processing_guarantee):
+ driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
LOG_FILE = driver.LOG_FILE # this is the same for all instaces of the service, so we can just declare a "constant"
- processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1")
- processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2")
+ processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1", processing_guarantee)
+ processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2", processing_guarantee)
processor1.start()
processor2.start()
@@ -104,7 +107,7 @@ class StreamsRelationalSmokeTest(KafkaTest):
processor1.stop_nodes(not crash)
- processor3 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor3")
+ processor3 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor3", processing_guarantee)
processor3.start()
processor3.await_command("grep -q 'Streams has started' %s" % LOG_FILE)
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 094869b..1a4f296 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -13,13 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsSmokeTestEOSJobRunnerService
-
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
class StreamsSmokeTest(KafkaTest):
"""
@@ -48,19 +46,11 @@ class StreamsSmokeTest(KafkaTest):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
@cluster(num_nodes=8)
- @matrix(eos=[True, False], crash=[True, False])
- def test_streams(self, eos, crash):
- #
- if eos:
- processor1 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
- processor2 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
- processor3 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
- else:
- processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
- processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
- processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
-
-
+ @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False])
+ def test_streams(self, processing_guarantee, crash):
+ processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
+ processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
+ processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as monitor1:
processor1.start()
@@ -114,7 +104,7 @@ class StreamsSmokeTest(KafkaTest):
processor3.stop()
- if crash and not eos:
+ if crash and processing_guarantee == 'at_least_once':
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
else:
self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)