You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/11 11:00:59 UTC
kafka git commit: KAFKA-5816;
[FOLLOW UP] create ProducedInternal class
Repository: kafka
Updated Branches:
refs/heads/trunk c5464edbb -> 779714c08
KAFKA-5816; [FOLLOW UP] create ProducedInternal class
Create `ProducedInternal` and remove getters from `Produced`
Author: Damian Guy <da...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3810 from dguy/kafka-5816-follow-up
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/779714c0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/779714c0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/779714c0
Branch: refs/heads/trunk
Commit: 779714c08bc16fcdd6fe7c39e92a7f73ebebdb71
Parents: c5464ed
Author: Damian Guy <da...@gmail.com>
Authored: Mon Sep 11 12:00:54 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 11 12:00:54 2017 +0100
----------------------------------------------------------------------
.../apache/kafka/streams/kstream/Produced.java | 24 +++++-------
.../streams/kstream/internals/KStreamImpl.java | 12 ++++--
.../kstream/internals/ProducedInternal.java | 39 ++++++++++++++++++++
3 files changed, 57 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/779714c0/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
index 488bd15..b2513ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Produced.java
@@ -30,9 +30,9 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
*/
public class Produced<K, V> {
- private Serde<K> keySerde;
- private Serde<V> valueSerde;
- private StreamPartitioner<? super K, ? super V> partitioner;
+ protected Serde<K> keySerde;
+ protected Serde<V> valueSerde;
+ protected StreamPartitioner<? super K, ? super V> partitioner;
private Produced(final Serde<K> keySerde,
final Serde<V> valueSerde,
@@ -42,6 +42,12 @@ public class Produced<K, V> {
this.partitioner = partitioner;
}
+ protected Produced(final Produced<K, V> produced) {
+ this.keySerde = produced.keySerde;
+ this.valueSerde = produced.valueSerde;
+ this.partitioner = produced.partitioner;
+ }
+
/**
* Create a Produced instance with provided keySerde and valueSerde.
* @param keySerde Serde to use for serializing the key
@@ -148,16 +154,4 @@ public class Produced<K, V> {
this.keySerde = keySerde;
return this;
}
-
- public Serde<K> keySerde() {
- return keySerde;
- }
-
- public Serde<V> valueSerde() {
- return valueSerde;
- }
-
- public StreamPartitioner<? super K, ? super V> streamPartitioner() {
- return partitioner;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/779714c0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7adc426..41da536 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -378,10 +378,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
- to(topic, produced);
+ final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced);
+ to(topic, producedInternal);
return builder.stream(Collections.singleton(topic),
- new ConsumedInternal<>(produced.keySerde(),
- produced.valueSerde(),
+ new ConsumedInternal<>(producedInternal.keySerde(),
+ producedInternal.valueSerde(),
new FailOnInvalidTimestamp(),
null));
}
@@ -455,6 +456,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
public void to(final String topic, final Produced<K, V> produced) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(produced, "Produced can't be null");
+ to(topic, new ProducedInternal<>(produced));
+
+ }
+
+ private void to(final String topic, final ProducedInternal<K, V> produced) {
final String name = builder.newName(SINK_NAME);
final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer();
http://git-wip-us.apache.org/repos/asf/kafka/blob/779714c0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
new file mode 100644
index 0000000..f7f68fe
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ProducedInternal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+class ProducedInternal<K, V> extends Produced<K, V> {
+ ProducedInternal(final Produced<K, V> produced) {
+ super(produced);
+ }
+
+ Serde<K> keySerde() {
+ return keySerde;
+ }
+
+ Serde<V> valueSerde() {
+ return valueSerde;
+ }
+
+ StreamPartitioner<? super K, ? super V> streamPartitioner() {
+ return partitioner;
+ }
+}