You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/05/30 18:13:29 UTC

[kafka] branch trunk updated: KAFKA-6957 make InternalTopologyBuilder accessible from AbstractStream subclasses (#5085)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14171fa  KAFKA-6957 make InternalTopologyBuilder accessible from AbstractStream subclasses (#5085)
14171fa is described below

commit 14171fa8b43e445afd7731463e0b548422602712
Author: Florian Hussonnois <fh...@gmail.com>
AuthorDate: Wed May 30 20:13:24 2018 +0200

    KAFKA-6957 make InternalTopologyBuilder accessible from AbstractStream subclasses (#5085)
    
    Currently, the AbstractStream class defines a copy-constructor that allow to extend KStream and KTable APIs with new methods without impacting the public interface.
    
    However adding new processor or/and store to the topology is made throught the internalTopologyBuilder that is not accessible from AbstractStream subclasses defined outside of the package (package visibility).
    
    Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../apache/kafka/streams/kstream/internals/AbstractStream.java    | 8 +++++++-
 .../kafka/streams/kstream/internals/AbstractStreamTest.java       | 2 +-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 7bc7a15..aa2a727 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
 import java.util.HashSet;
 import java.util.Objects;
@@ -33,7 +34,7 @@ public abstract class AbstractStream<K> {
 
     protected final InternalStreamsBuilder builder;
     protected final String name;
-    final Set<String> sourceNodes;
+    protected final Set<String> sourceNodes;
 
     // This copy-constructor will allow to extend KStream
     // and KTable APIs with new methods without impacting the public interface.
@@ -53,6 +54,11 @@ public abstract class AbstractStream<K> {
         this.sourceNodes = sourceNodes;
     }
 
+    // This method allows to expose the InternalTopologyBuilder instance
+    // to subclasses that extend AbstractStream class.
+    protected InternalTopologyBuilder internalTopologyBuilder() {
+        return builder.internalTopologyBuilder;
+    }
 
     Set<String> ensureJoinableWith(final AbstractStream<K> other) {
         Set<String> allSourceNodes = new HashSet<>();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 4c3066f..a0cda1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -102,7 +102,7 @@ public class AbstractStreamTest {
 
         KStream<K, V> randomFilter() {
             String name = builder.newProcessorName("RANDOM-FILTER-");
-            builder.internalTopologyBuilder.addProcessor(name, new ExtendedKStreamDummy(), this.name);
+            internalTopologyBuilder().addProcessor(name, new ExtendedKStreamDummy(), this.name);
             return new KStreamImpl<>(builder, name, sourceNodes, false);
         }
     }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.