You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/29 21:17:06 UTC
[kafka] branch trunk updated: MINOR: Move KTable source topic for
changelog to optimization framework (#6500)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 369d89f MINOR: Move KTable source topic for changelog to optimization framework (#6500)
369d89f is described below
commit 369d89f2080586773b5a3cc432cbcf655aa1f625
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Mar 29 17:16:56 2019 -0400
MINOR: Move KTable source topic for changelog to optimization framework (#6500)
Since we've added Kafka Streams optimizations in 2.1 we need to move the optimization for source KTable nodes (use source topic as changelog) to the optimization framework.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../kstream/internals/InternalStreamsBuilder.java | 9 ++++++
.../kstream/internals/graph/TableSourceNode.java | 12 +++++++-
.../internals/InternalTopologyBuilder.java | 36 +---------------------
.../apache/kafka/streams/StreamsBuilderTest.java | 2 +-
.../integration/RestoreIntegrationTest.java | 2 +-
5 files changed, 23 insertions(+), 38 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c06b988..920f213 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -61,6 +61,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
+ private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>();
private static final String TOPOLOGY_ROOT = "root";
private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -254,6 +255,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
} else if (node.isMergeNode()) {
mergeNodes.add(node);
+ } else if (node instanceof TableSourceNode) {
+ tableSourceNodes.add(node);
}
}
@@ -292,10 +295,16 @@ public class InternalStreamsBuilder implements InternalNameProvider {
if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+ optimizeKTableSourceTopics();
maybeOptimizeRepartitionOperations();
}
}
+ private void optimizeKTableSourceTopics() {
+ LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
+ tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true));
+ }
+
@SuppressWarnings("unchecked")
private void maybeOptimizeRepartitionOperations() {
maybeUpdateKeyChangingRepartitionNodeMap();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 53061dc..fa979b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -38,6 +38,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
private final ProcessorParameters<K, V> processorParameters;
private final String sourceName;
private final boolean isGlobalKTable;
+ private boolean shouldReuseSourceTopicForChangelog = false;
private TableSourceNode(final String nodeName,
final String sourceName,
@@ -57,6 +58,11 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
this.materializedInternal = materializedInternal;
}
+
+ public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicForChangelog) {
+ this.shouldReuseSourceTopicForChangelog = shouldReuseSourceTopicForChangelog;
+ }
+
@Override
public String toString() {
return "TableSourceNode{" +
@@ -104,7 +110,11 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
if (ktableSource.queryableName() != null) {
topologyBuilder.addStateStore(storeBuilder, nodeName());
- topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName);
+
+ if (shouldReuseSourceTopicForChangelog) {
+ storeBuilder.withLoggingDisabled();
+ topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 334adce..792df53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -123,9 +123,6 @@ public class InternalTopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
- // TODO: this is only temporary for 2.0 and should be removed
- private final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
-
public static class StateStoreFactory {
private final StoreBuilder builder;
private final Set<String> users = new HashSet<>();
@@ -359,10 +356,6 @@ public class InternalTopologyBuilder {
globalStateStores.put(storeBuilder.name(), storeBuilder.build());
}
- // adjust the topology if optimization is turned on.
- // TODO: to be removed post 2.0
- adjust(config);
-
return this;
}
@@ -606,7 +599,7 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
- private void connectSourceStoreAndTopic(final String sourceStoreName,
+ public void connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyException("Source store " + sourceStoreName + " is already added.");
@@ -614,14 +607,6 @@ public class InternalTopologyBuilder {
storeToChangelogTopic.put(sourceStoreName, topic);
}
- public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
- final String topic) {
- if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
- throw new TopologyException("Source store " + storeBuilder.name() + " is already used.");
- }
- storeToSourceChangelogTopic.put(storeBuilder, topic);
- }
-
public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
@@ -1071,25 +1056,6 @@ public class InternalTopologyBuilder {
return Collections.unmodifiableMap(topicGroups);
}
- // Adjust the generated topology based on the configs.
- // Not exposed as public API and should be removed post 2.0
- private void adjust(final StreamsConfig config) {
- final boolean enableOptimization20 =
- config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
-
- if (enableOptimization20) {
- for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) {
- final StoreBuilder storeBuilder = entry.getKey();
- final String topicName = entry.getValue();
-
- // update store map to disable logging for this store
- storeBuilder.withLoggingDisabled();
- addStateStore(storeBuilder, true);
- connectSourceStoreAndTopic(storeBuilder.name(), topicName);
- }
- }
- }
-
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index b51eac8..f140546 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -349,9 +349,9 @@ public class StreamsBuilderTest {
public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
- final Topology topology = builder.build();
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+ final Topology topology = builder.build(props);
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 830514b..f21dbfc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -146,7 +146,7 @@ public class RestoreIntegrationTest {
}
});
- kafkaStreams = new KafkaStreams(builder.build(), props);
+ kafkaStreams = new KafkaStreams(builder.build(props), props);
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
startupLatch.countDown();