You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/08 19:10:27 UTC
[kafka] branch 2.4 updated: KAFKA-9127: don't create StreamThreads
for global-only topology (2.4) (#8616)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 8d54432 KAFKA-9127: don't create StreamThreads for global-only topology (2.4) (#8616)
8d54432 is described below
commit 8d54432437378cad873ef15ca3b4718d7f1c1152
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri May 8 12:09:41 2020 -0700
KAFKA-9127: don't create StreamThreads for global-only topology (2.4) (#8616)
Backports: https://github.com/apache/kafka/pull/8540
Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <vv...@apache.org>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 40 ++++++--
.../internals/InternalTopologyBuilder.java | 4 +
.../org/apache/kafka/streams/KafkaStreamsTest.java | 101 ++++++++++++++++-----
.../integration/GlobalKTableIntegrationTest.java | 18 ++++
.../integration/utils/IntegrationTestUtils.java | 27 ++++++
6 files changed, 156 insertions(+), 36 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 941dd1b..c8678ac 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -175,7 +175,7 @@
files="StreamsPartitionAssignor.java"/>
<suppress checks="NPathComplexity"
- files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
+ files="(ProcessorStateManager|InternalTopologyBuilder|KafkaStreams|StreamsPartitionAssignor|StreamThread).java"/>
<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 621bb91..3a22e14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.KStream;
@@ -48,6 +49,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
+import org.apache.kafka.streams.processor.internals.GlobalStreamThread.State;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
@@ -480,8 +482,9 @@ public class KafkaStreams implements AutoCloseable {
final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState;
globalThreadState = newState;
- // special case when global thread is dead
- if (newState == GlobalStreamThread.State.DEAD) {
+ if (newState == GlobalStreamThread.State.RUNNING) {
+ maybeSetRunning();
+ } else if (newState == GlobalStreamThread.State.DEAD) {
if (setState(State.ERROR)) {
log.error("Global thread has died. The instance will be in error state and should be closed.");
}
@@ -696,28 +699,45 @@ public class KafkaStreams implements AutoCloseable {
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+ final int numStreamThreads;
+ if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+ log.info("Overriding number of StreamThreads to zero for global-only topology");
+ numStreamThreads = 0;
+ } else {
+ numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+ }
+
// create the stream thread, global update thread, and cleanup thread
- threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+ threads = new StreamThread[numStreamThreads];
+
+ final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ final boolean hasGlobalTopology = globalTaskTopology != null;
+
+ if (numStreamThreads == 0 && !hasGlobalTopology) {
+ log.error("Topology with no input topics will create no stream threads and no global thread.");
+ throw new TopologyException("Topology has no stream threads and no global threads, " +
+ "must subscribe to at least one source topic or global table.");
+ }
long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
if (totalCacheSize < 0) {
totalCacheSize = 0;
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
- final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
- final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
- final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
- (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
+
+ final long cacheSizePerThread = totalCacheSize / (threads.length + (hasGlobalTopology ? 1 : 0));
+ final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
+ (hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore());
try {
- stateDirectory = new StateDirectory(config, time, createStateDirectory);
+ stateDirectory = new StateDirectory(config, time, hasPersistentStores);
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
- if (globalTaskTopology != null) {
+ if (hasGlobalTopology) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(
globalTaskTopology,
@@ -758,7 +778,7 @@ public class KafkaStreams implements AutoCloseable {
}
final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
- if (globalTaskTopology != null) {
+ if (hasGlobalTopology) {
globalStreamThread.setStateListener(streamStateListener);
}
for (final StreamThread thread : threads) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index d28aee8..f11536a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1235,6 +1235,10 @@ public class InternalTopologyBuilder {
setRegexMatchedTopicToStateStore();
}
+ public boolean hasNoNonGlobalTopology() {
+ return sourceTopicNames.isEmpty() && nodeToSourcePatterns.isEmpty();
+ }
+
private boolean isGlobalSource(final String nodeName) {
final NodeFactory nodeFactory = nodeFactories.get(nodeName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index cd24685..8124e3e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -83,6 +84,7 @@ import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -104,7 +106,7 @@ public class KafkaStreamsTest {
private MockTime time;
private Properties props;
-
+
@Mock
private StateDirectory stateDirectory;
@Mock
@@ -317,7 +319,7 @@ public class KafkaStreamsTest {
@Test
public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.close();
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
@@ -325,7 +327,7 @@ public class KafkaStreamsTest {
@Test
public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.setStateListener(streamsStateListener);
Assert.assertEquals(0, streamsStateListener.numChanges);
@@ -393,7 +395,7 @@ public class KafkaStreamsTest {
@Test
public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.setStateListener(streamsStateListener);
Assert.assertEquals(0, streamsStateListener.numChanges);
@@ -457,7 +459,7 @@ public class KafkaStreamsTest {
@Test
public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -477,7 +479,7 @@ public class KafkaStreamsTest {
@Test
public void testStateThreadClose() throws Exception {
// make sure we have the global state thread running too
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -514,7 +516,7 @@ public class KafkaStreamsTest {
@Test
public void testStateGlobalThreadClose() throws Exception {
// make sure we have the global state thread running too
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
@@ -542,7 +544,7 @@ public class KafkaStreamsTest {
public void testInitializesAndDestroysMetricsReporters() {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
final int initDiff = newInitCount - oldInitCount;
assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
@@ -556,7 +558,7 @@ public class KafkaStreamsTest {
@Test
public void testCloseIsIdempotent() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.close();
final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -567,7 +569,7 @@ public class KafkaStreamsTest {
@Test
public void testCannotStartOnceClosed() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
streams.close();
try {
@@ -582,7 +584,7 @@ public class KafkaStreamsTest {
@Test
public void shouldNotSetGlobalRestoreListenerAfterStarting() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
streams.setGlobalStateRestoreListener(null);
@@ -596,7 +598,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
streams.setUncaughtExceptionHandler(null);
@@ -608,7 +610,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
try {
streams.setStateListener(null);
@@ -620,7 +622,7 @@ public class KafkaStreamsTest {
@Test
public void shouldAllowCleanupBeforeStartAndAfterClose() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
try {
streams.cleanUp();
streams.start();
@@ -632,7 +634,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
@@ -648,46 +650,46 @@ public class KafkaStreamsTest {
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.allMetadata();
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.metadataForKey("store", "key", Serdes.String().serializer());
}
@Test(expected = IllegalStateException.class)
public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
- final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0);
}
@Test
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
// do not use mock time so that it can really elapse
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(Duration.ofMillis(10L)));
}
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnNegativeTimeoutForClose() {
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.close(Duration.ofMillis(-1L));
}
}
@Test
public void shouldNotBlockInCloseForZeroDuration() {
- try (final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
// with mock time that does not elapse, close would not return if it ever waits on the state transition
assertFalse(streams.close(Duration.ZERO));
}
@@ -720,7 +722,7 @@ public class KafkaStreamsTest {
builder.table("topic", Materialized.as("store"));
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.DEBUG.name());
- try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
}
@@ -743,7 +745,7 @@ public class KafkaStreamsTest {
builder.table("topic", Materialized.as("store"));
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, RecordingLevel.INFO.name());
- try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
}
@@ -763,7 +765,7 @@ public class KafkaStreamsTest {
props.setProperty(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, TestRocksDbConfigSetter.class.getName());
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
LogCaptureAppender.unregister(appender);
assertThat(appender.getMessages(), hasItem("stream-client [" + CLIENT_ID + "] "
@@ -845,6 +847,49 @@ public class KafkaStreamsTest {
startStreamsAndCheckDirExists(topology, true);
}
+ @Test
+ public void shouldThrowTopologyExceptionOnEmptyTopology() {
+ try {
+ new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+ fail("Should have thrown TopologyException");
+ } catch (final TopologyException e) {
+ assertThat(
+ e.getMessage(),
+ equalTo("Invalid topology: Topology has no stream threads and no global threads, " +
+ "must subscribe to at least one source topic or global table."));
+ }
+ }
+
+ @Test
+ public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.globalTable("anyTopic");
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+ assertThat(streams.threads.length, equalTo(0));
+ }
+
+ @Test
+ public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedException {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.globalTable("anyTopic");
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+ assertThat(streams.threads.length, equalTo(0));
+ assertEquals(streams.state(), KafkaStreams.State.CREATED);
+
+ streams.start();
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.RUNNING,
+ "Streams never started, state is " + streams.state());
+
+ streams.close();
+
+ TestUtils.waitForCondition(
+ () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
+ "Streams never stopped.");
+ }
+
@SuppressWarnings("unchecked")
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
@@ -886,6 +931,12 @@ public class KafkaStreamsTest {
new MockProcessorSupplier());
return topology;
}
+
+ private StreamsBuilder getBuilderWithSource() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream("source-topic");
+ return builder;
+ }
private void startStreamsAndCheckDirExists(final Topology topology,
final boolean shouldFilesExist) throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 0a9148d..e33bdd1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import java.time.Duration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
@@ -23,6 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -54,6 +56,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@@ -267,6 +271,20 @@ public class GlobalKTableIntegrationTest {
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}
+ @Test
+ public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
+ builder = new StreamsBuilder();
+ globalTable = builder.globalTable(
+ globalTableTopic,
+ Consumed.with(Serdes.Long(), Serdes.String()),
+ Materialized.as(Stores.inMemoryKeyValueStore(globalStore)));
+
+ startStreams();
+ waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, Duration.ofSeconds(30));
+
+ kafkaStreams.close();
+ }
+
private void createTopics() throws Exception {
streamTopic = "stream-" + testNo;
globalTableTopic = "globalTable-" + testNo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 4921c4f..29e584c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -809,6 +809,33 @@ public class IntegrationTestUtils {
}
}
+ /**
+ * Waits for the given {@link KafkaStreams} instances to all be in a {@link State#RUNNING}
+ * state. Prefer {@link #startApplicationAndWaitUntilRunning(List, Duration)} when possible
+ * because this method uses polling, which can be more error prone and slightly slower.
+ *
+ * @param streamsList the list of streams instances to run.
+ * @param timeout the time to wait for the streams to all be in {@link State#RUNNING} state.
+ */
+ public static void waitForApplicationState(final List<KafkaStreams> streamsList,
+ final State state,
+ final Duration timeout) throws InterruptedException {
+ retryOnExceptionWithTimeout(timeout.toMillis(), () -> {
+ final Map<KafkaStreams, State> streamsToStates = streamsList
+ .stream()
+ .collect(Collectors.toMap(stream -> stream, KafkaStreams::state));
+
+ final Map<KafkaStreams, State> wrongStateMap = streamsToStates.entrySet()
+ .stream()
+ .filter(entry -> entry.getValue() != state)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ final String reason = String.format("Expected all streams instances in %s to be %s within %d ms, but the following were not: %s",
+ streamsList, state, timeout.toMillis(), wrongStateMap);
+ assertThat(reason, wrongStateMap.isEmpty());
+ });
+ }
+
private static StateListener getStateListener(final KafkaStreams streams) {
try {
final Field field = streams.getClass().getDeclaredField("stateListener");