You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/02 08:48:35 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #9609: KAFKA-6687: restrict DSL to allow only Streams from the same source topics

cadonna commented on a change in pull request #9609:
URL: https://github.com/apache/kafka/pull/9609#discussion_r533990330



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
+
+abstract public class SourceGraphNode<K, V> extends StreamsGraphNode {
+
+    private Collection<String> topicNames;
+    private Pattern topicPattern;
+    private final ConsumedInternal<K, V> consumedInternal;
+
+    public SourceGraphNode(final String nodeName,
+                            final Collection<String> topicNames,
+                            final ConsumedInternal<K, V> consumedInternal) {
+        super(nodeName);
+
+        this.topicNames = topicNames;
+        this.consumedInternal = consumedInternal;
+    }
+
+    public SourceGraphNode(final String nodeName,
+                            final Pattern topicPattern,
+                            final ConsumedInternal<K, V> consumedInternal) {
+
+        super(nodeName);
+
+        this.topicPattern = topicPattern;
+        this.consumedInternal = consumedInternal;
+    }
+
+    public Set<String> topicNames() {
+        return new HashSet<>(topicNames);

Review comment:
       Fair enough and I think that this is nothing urgent or absolute necessary. However, I would like to explain my line of thoughts. I think an interface of a class should also describe the constraints on the the object and as far as I see it does not make any sense to pass the same topic name multiple times to a source node. 
   I do not see an issue with making the same conversion in more places and actually this is even not true because the only place we would do a conversion is in `StreamsBuilder#stream()`. All other dependent calls create a singleton collection which can be easily replaced with a singleton set. Actually, I do not understand why `StreamsBuilder#stream()` takes a collection instead of a set.
   
   I am not sure I can follow your other argument
   
   >  the actual callers of the constructor don't care at all whether it's a Set or any other Collection
    
   Do you refer to the creation of the singleton collection in the callers?
   
   As I said, I do not say we need to follow my proposal. I just wanted to argue in favor of a cleaner and more descriptive interface.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org