You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/30 18:13:29 UTC
[kafka] branch trunk updated: KAFKA-6957 make
InternalTopologyBuilder accessible from AbstractStream subclasses (#5085)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14171fa KAFKA-6957 make InternalTopologyBuilder accessible from AbstractStream subclasses (#5085)
14171fa is described below
commit 14171fa8b43e445afd7731463e0b548422602712
Author: Florian Hussonnois <fh...@gmail.com>
AuthorDate: Wed May 30 20:13:24 2018 +0200
KAFKA-6957 make InternalTopologyBuilder accessible from AbstractStream subclasses (#5085)
Currently, the AbstractStream class defines a copy-constructor that allow to extend KStream and KTable APIs with new methods without impacting the public interface.
However adding new processor or/and store to the topology is made throught the internalTopologyBuilder that is not accessible from AbstractStream subclasses defined outside of the package (package visibility).
Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../apache/kafka/streams/kstream/internals/AbstractStream.java | 8 +++++++-
.../kafka/streams/kstream/internals/AbstractStreamTest.java | 2 +-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 7bc7a15..aa2a727 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import java.util.HashSet;
import java.util.Objects;
@@ -33,7 +34,7 @@ public abstract class AbstractStream<K> {
protected final InternalStreamsBuilder builder;
protected final String name;
- final Set<String> sourceNodes;
+ protected final Set<String> sourceNodes;
// This copy-constructor will allow to extend KStream
// and KTable APIs with new methods without impacting the public interface.
@@ -53,6 +54,11 @@ public abstract class AbstractStream<K> {
this.sourceNodes = sourceNodes;
}
+ // This method allows to expose the InternalTopologyBuilder instance
+ // to subclasses that extend AbstractStream class.
+ protected InternalTopologyBuilder internalTopologyBuilder() {
+ return builder.internalTopologyBuilder;
+ }
Set<String> ensureJoinableWith(final AbstractStream<K> other) {
Set<String> allSourceNodes = new HashSet<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 4c3066f..a0cda1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -102,7 +102,7 @@ public class AbstractStreamTest {
KStream<K, V> randomFilter() {
String name = builder.newProcessorName("RANDOM-FILTER-");
- builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
+ internalTopologyBuilder().addProcessor(name, new ExtendedKStreamDummy(), this.name);
return new KStreamImpl<>(builder, name, sourceNodes, false);
}
}
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.