You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/11 11:00:59 UTC

kafka git commit: KAFKA-5816; [FOLLOW UP] create ProducedInternal class

Repository: kafka
Updated Branches:
  refs/heads/trunk c5464edbb -> 779714c08


KAFKA-5816; [FOLLOW UP] create ProducedInternal class

Create `ProducedInternal` and remove getters from `Produced`

Author: Damian Guy <da...@gmail.com>

Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3810 from dguy/kafka-5816-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/779714c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/779714c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/779714c0

Branch: refs/heads/trunk
Commit: 779714c08bc16fcdd6fe7c39e92a7f73ebebdb71
Parents: c5464ed
Author: Damian Guy <da...@gmail.com>
Authored: Mon Sep 11 12:00:54 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 11 12:00:54 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/Produced.java  | 24 +++++-------
 .../streams/kstream/internals/KStreamImpl.java  | 12 ++++--
 .../kstream/internals/ProducedInternal.java     | 39 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/779714c0/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index 488bd15..b2513ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -30,9 +30,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
  */
 public class Produced<K, V> {
 
-    private Serde<K> keySerde;
-    private Serde<V> valueSerde;
-    private StreamPartitioner<? super K, ? super V> partitioner;
+    protected Serde<K> keySerde;
+    protected Serde<V> valueSerde;
+    protected StreamPartitioner<? super K, ? super V> partitioner;
 
     private Produced(final Serde<K> keySerde,
                      final Serde<V> valueSerde,
@@ -42,6 +42,12 @@ public class Produced<K, V> {
         this.partitioner = partitioner;
     }
 
+    protected Produced(final Produced<K, V> produced) {
+        this.keySerde = produced.keySerde;
+        this.valueSerde = produced.valueSerde;
+        this.partitioner = produced.partitioner;
+    }
+
     /**
      * Create a Produced instance with provided keySerde and valueSerde.
      * @param keySerde      Serde to use for serializing the key
@@ -148,16 +154,4 @@ public class Produced<K, V> {
         this.keySerde = keySerde;
         return this;
     }
-
-    public Serde<K> keySerde() {
-        return keySerde;
-    }
-
-    public Serde<V> valueSerde() {
-        return valueSerde;
-    }
-
-    public StreamPartitioner<? super K, ? super V>  streamPartitioner() {
-        return partitioner;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/779714c0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7adc426..41da536 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -378,10 +378,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     @Override
     public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
-        to(topic, produced);
+        final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+        to(topic, producedInternal);
         return builder.stream(Collections.singleton(topic),
-                              new ConsumedInternal<>(produced.keySerde(),
-                                            produced.valueSerde(),
+                              new ConsumedInternal<>(producedInternal.keySerde(),
+                                            producedInternal.valueSerde(),
                                             new FailOnInvalidTimestamp(),
                                             null));
     }
@@ -455,6 +456,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public void to(final String topic, final Produced<K, V> produced) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
+        to(topic, new ProducedInternal<>(produced));
+
+    }
+
+    private void to(final String topic, final ProducedInternal<K, V> produced) {
         final String name = builder.newName(SINK_NAME);
         final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
         final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/779714c0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
new file mode 100644
index 0000000..f7f68fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+class ProducedInternal<K, V> extends Produced<K, V> {
+    ProducedInternal(final Produced<K, V> produced) {
+        super(produced);
+    }
+
+    Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    StreamPartitioner<? super K, ? super V> streamPartitioner() {
+        return partitioner;
+    }
+}