You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 03:55:55 UTC
[kafka] branch 2.5 updated: KAFKA-9127: don't create StreamThreads
for global-only topology (#8540)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 9e2785f KAFKA-9127: don't create StreamThreads for global-only topology (#8540)
9e2785f is described below
commit 9e2785fd1ba0ed16604e01058bae6b60ff9f3d96
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Apr 28 20:34:17 2020 -0700
KAFKA-9127: don't create StreamThreads for global-only topology (#8540)
Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <vv...@apache.org>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 35 +++++--
.../internals/InternalTopologyBuilder.java | 4 +
.../org/apache/kafka/streams/KafkaStreamsTest.java | 105 +++++++++++++++------
.../integration/GlobalKTableIntegrationTest.java | 18 ++++
5 files changed, 128 insertions(+), 36 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7b85455..bb5cf46 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -177,7 +177,7 @@
files="StreamsPartitionAssignor.java"/>
<suppress checks="NPathComplexity"
- files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
+ files="(AssignorConfiguration|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>
<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 67d185d..43a5341 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.KStream;
@@ -56,6 +57,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
@@ -489,8 +491,9 @@ public class KafkaStreams implements AutoCloseable {
final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
globalThreadState = newState;
- // special case when global thread is dead
- if (newState == GlobalStreamThread.State.DEAD) {
+ if (newState == GlobalStreamThread.State.RUNNING) {
+ maybeSetRunning();
+ } else if (newState == GlobalStreamThread.State.DEAD) {
if (setState(State.ERROR)) {
log.error("Global thread has died. The instance will be in error state and should be closed.");
}
@@ -706,18 +709,34 @@ public class KafkaStreams implements AutoCloseable {
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+ final int numStreamThreads;
+ if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+ log.info("Overriding number of StreamThreads to zero for global-only topology");
+ numStreamThreads = 0;
+ } else {
+ numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ }
+
// create the stream thread, global update thread, and cleanup thread
- threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+ threads = new StreamThread[numStreamThreads];
+
+ final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ final boolean hasGlobalTopology = globalTaskTopology != null;
+
+ if (numStreamThreads == 0 && !hasGlobalTopology) {
+ log.error("Topology with no input topics will create no stream threads and no global thread.");
+ throw new TopologyException("Topology has no stream threads and no global threads, " +
+ "must subscribe to at least one source topic or global table.");
+ }
long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
if (totalCacheSize < 0) {
totalCacheSize = 0;
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
- final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
- final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
+ final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology ? 1 : 0));
final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
- (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
+ (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
try {
stateDirectory = new StateDirectory(config, time, hasPersistentStores);
@@ -727,7 +746,7 @@ public class KafkaStreams implements AutoCloseable {
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
- if (globalTaskTopology != null) {
+ if (hasGlobalTopology) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(
globalTaskTopology,
@@ -768,7 +787,7 @@ public class KafkaStreams implements AutoCloseable {
}
final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
- if (globalTaskTopology != null) {
+ if (hasGlobalTopology) {
globalStreamThread.setStateListener(streamStateListener);
}
for (final StreamThread thread : threads) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index bdbf9cb..f36324d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1240,6 +1240,10 @@ public class InternalTopologyBuilder {
return topicPattern;
}
+ public boolean hasNoNonGlobalTopology() {
+ return !usesPatternSubscription() && sourceTopicCollection().isEmpty();
+ }
+
private boolean isGlobalSource(final String nodeName) {
final NodeFactory nodeFactory = nodeFactories.get(nodeName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index d34a09d..c8a22e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -88,6 +89,7 @@ import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -109,7 +111,7 @@ public class KafkaStreamsTest {
private MockTime time;
private Properties props;
-
+
@Mock
private StateDirectory stateDirectory;
@Mock
@@ -326,7 +328,7 @@ public class KafkaStreamsTest {
@Test
public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.close();
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
@@ -334,7 +336,7 @@ public class KafkaStreamsTest {
@Test
public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.setStateListener(streamsStateListener);
Assert.assertEquals(0, streamsStateListener.numChanges);
@@ -402,7 +404,7 @@ public class KafkaStreamsTest {
@Test
public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.setStateListener(streamsStateListener);
Assert.assertEquals(0, streamsStateListener.numChanges);
@@ -466,7 +468,7 @@ public class KafkaStreamsTest {
@Test
public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -486,7 +488,7 @@ public class KafkaStreamsTest {
@Test
public void testStateThreadClose() throws Exception {
// make sure we have the global state thread running too
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -523,7 +525,7 @@ public class KafkaStreamsTest {
@Test
public void testStateGlobalThreadClose() throws Exception {
// make sure we have the global state thread running too
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -551,7 +553,7 @@ public class KafkaStreamsTest {
public void testInitializesAndDestroysMetricsReporters() {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
final int initDiff = newInitCount - oldInitCount;
assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
@@ -565,7 +567,7 @@ public class KafkaStreamsTest {
@Test
public void testCloseIsIdempotent() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.close();
final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -576,7 +578,7 @@ public class KafkaStreamsTest {
@Test
public void testCannotStartOnceClosed() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
streams.close();
try {
@@ -591,7 +593,7 @@ public class KafkaStreamsTest {
@Test
public void shouldNotSetGlobalRestoreListenerAfterStarting() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
streams.setGlobalStateRestoreListener(null);
@@ -605,7 +607,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
streams.setUncaughtExceptionHandler(null);
@@ -617,7 +619,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
streams.setStateListener(null);
@@ -629,7 +631,7 @@ public class KafkaStreamsTest {
@Test
public void shouldAllowCleanupBeforeStartAndAfterClose() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
try {
streams.cleanUp();
streams.start();
@@ -641,7 +643,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
@@ -657,32 +659,32 @@ public class KafkaStreamsTest {
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.allMetadata();
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.queryMetadataForKey("store", "key", Serdes.String().serializer());
}
@Test
public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0);
}
@@ -702,7 +704,7 @@ public class KafkaStreamsTest {
EasyMock.expect(mockClientSupplier.getAdmin(anyObject())).andReturn(mockAdminClient);
EasyMock.replay(result, mockAdminClient, mockClientSupplier);
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, mockClientSupplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, mockClientSupplier, time);
streams.start();
assertEquals(0, streams.allLocalStorePartitionLags().size());
}
@@ -710,21 +712,21 @@ public class KafkaStreamsTest {
@Test
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
// do not use mock time so that it can really elapse
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(Duration.ofMillis(10L)));
}
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnNegativeTimeoutForClose() {
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.close(Duration.ofMillis(-1L));
}
}
@Test
public void shouldNotBlockInCloseForZeroDuration() {
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
// with mock time that does not elapse, close would not return if it ever waits on the state transition
assertFalse(streams.close(Duration.ZERO));
}
@@ -757,7 +759,7 @@ public class KafkaStreamsTest {
builder.table("topic", Materialized.as("store"));
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.DEBUG.name());
- try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
}
@@ -780,7 +782,7 @@ public class KafkaStreamsTest {
builder.table("topic", Materialized.as("store"));
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.INFO.name());
- try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
}
@@ -800,7 +802,7 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
LogCaptureAppender.unregister(appender);
assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
@@ -882,6 +884,49 @@ public class KafkaStreamsTest {
startStreamsAndCheckDirExists(topology, true);
}
+ @Test
+ public void shouldThrowTopologyExceptionOnEmptyTopology() {
+ try {
+ new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ fail("Should have thrown TopologyException");
+ } catch (final TopologyException e) {
+ assertThat(
+ e.getMessage(),
+ equalTo("Invalid topology: Topology has no stream threads and no global threads, " +
+ "must subscribe to at least one source topic or global table."));
+ }
+ }
+
+ @Test
+ public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.globalTable("anyTopic");
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+ assertThat(streams.threads.length, equalTo(0));
+ }
+
+ @Test
+ public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedException {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.globalTable("anyTopic");
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+ assertThat(streams.threads.length, equalTo(0));
+ assertEquals(streams.state(), KafkaStreams.State.CREATED);
+
+ streams.start();
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.RUNNING,
+ "Streams never started, state is " + streams.state());
+
+ streams.close();
+
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+ "Streams never stopped.");
+ }
+
@SuppressWarnings("unchecked")
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
@@ -923,6 +968,12 @@ public class KafkaStreamsTest {
new MockProcessorSupplier());
return topology;
}
+
+ private StreamsBuilder getBuilderWithSource() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream("source-topic");
+ return builder;
+ }
private void startStreamsAndCheckDirExists(final Topology topology,
final boolean shouldFilesExist) throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index d9d6e9c..552a4e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import java.time.Duration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -23,6 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -55,6 +57,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@@ -268,6 +272,20 @@ public class GlobalKTableIntegrationTest {
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}
+ @Test
+ public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
+ builder = new StreamsBuilder();
+ globalTable = builder.globalTable(
+ globalTableTopic,
+ Consumed.with(Serdes.Long(), Serdes.String()),
+ Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));
+
+ startStreams();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));
+
+ kafkaStreams.close();
+ }
+
private void createTopics() throws Exception {
streamTopic = "stream-" + testNo;
globalTableTopic = "globalTable-" + testNo;