You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/27 23:09:59 UTC

[kafka] branch trunk updated: KAFKA-6386: Use Properties instead of StreamsConfig in KafkaStreams constructor

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 964693e  KAFKA-6386: Use Properties instead of StreamsConfig in KafkaStreams constructor
964693e is described below

commit 964693e40d9303d5d754010f03731d2134bae749
Author: Boyang Chen <bc...@outlook.com>
AuthorDate: Tue Mar 27 16:07:49 2018 -0700

    KAFKA-6386: Use Properties instead of StreamsConfig in KafkaStreams constructor
    
    This pull request targets https://issues.apache.org/jira/browse/KAFKA-6386
    The minor fix to deprecate usage of `StreamsConfig` in favor of `java.util.Properties`.
    I created separate public constructors using `Properties` in order to replace the old ones,
    and prioritize new functions in the `KafkaStreams.java` file.
    
    Since this is my first time doing open source contribution, I'm very happy to get
    any comment or pointer to be more professional and get better next time, thank you Guozhang guozhangwang and Liquan Ishiihara!
    
    testing strategy: existing unit test should be suffice to cover this change.
    
    Author: cs427fa16staff <bc...@outlook.com>
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
    
    Closes #4354 from abbccdda/starter
    
    github comments
---
 docs/streams/upgrade-guide.html                    |   7 +-
 .../apache/kafka/streams/KafkaClientSupplier.java  |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     | 113 ++++++++++++++-------
 .../org/apache/kafka/streams/StreamsConfig.java    |   2 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |   2 +-
 .../integration/AbstractJoinIntegrationTest.java   |   4 +-
 .../PurgeRepartitionTopicIntegrationTest.java      |   2 +-
 .../integration/RegexSourceIntegrationTest.java    |  11 +-
 8 files changed, 91 insertions(+), 52 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index baf9633..d21d505 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -86,7 +86,12 @@
         to let users specify inner serdes if the default serde classes are windowed serdes.
         For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-265%3A+Make+Windowed+Serde+to+public+APIs">KIP-265</a>.
     /<p>
-    
+
+     <p>
+        We have deprecated constructors of <code>KafkaStreams</code> that take a <code>StreamsConfig</code> as parameter.
+        Please use the other corresponding constructors that accept <code>java.util.Properties</code> instead.
+        For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-245%3A+Use+Properties+instead+of+StreamsConfig+in+KafkaStreams+constructor">KIP-245</a>.
+    </p>
     <p>
       Kafka 1.2.0 allows to manipulate timestamps of output records using the Processor API (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-251%3A+Allow+timestamp+manipulation+in+Processor+API">KIP-251</a>).
       To enable this new feature, <code>ProcessorContext#forward(...)</code> was modified.
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 2ea5218..8a6ec05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -26,7 +26,7 @@ import java.util.Map;
 /**
  * {@code KafkaClientSupplier} can be used to provide custom Kafka clients to a {@link KafkaStreams} instance.
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig, KafkaClientSupplier)
+ * @see KafkaStreams#KafkaStreams(Topology, java.util.Properties, KafkaClientSupplier)
  */
 public interface KafkaClientSupplier {
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 1a70e46..186276c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -515,31 +515,18 @@ public class KafkaStreams {
     }
 
     /**
-     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     * Create a {@code KafkaStreams} instance.
+     * <p>
+     * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
+     * you still must {@link #close()} it to avoid resource leaks.
+     *
+     * @param topology the topology specifying the computational logic
+     * @param props    properties for {@link StreamsConfig}
+     * @throws StreamsException if any fatal error occurs
      */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+    public KafkaStreams(final Topology topology,
                         final Properties props) {
-        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig)} instead
-     */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final StreamsConfig config) {
-        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
-    }
-
-    /**
-     * @deprecated use {@link #KafkaStreams(Topology, StreamsConfig, KafkaClientSupplier)} instead
-     */
-    @Deprecated
-    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
-                        final StreamsConfig config,
-                        final KafkaClientSupplier clientSupplier) {
-        this(builder.internalTopologyBuilder, config, clientSupplier);
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
     }
 
     /**
@@ -548,13 +535,16 @@ public class KafkaStreams {
      * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
      * you still must {@link #close()} it to avoid resource leaks.
      *
-     * @param topology the topology specifying the computational logic
-     * @param props   properties for {@link StreamsConfig}
+     * @param topology       the topology specifying the computational logic
+     * @param props          properties for {@link StreamsConfig}
+     * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
+     *                       for the new {@code KafkaStreams} instance
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
-                        final Properties props) {
-        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+                        final Properties props,
+                        final KafkaClientSupplier clientSupplier) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, Time.SYSTEM);
     }
 
     /**
@@ -563,13 +553,15 @@ public class KafkaStreams {
      * Note: even if you never call {@link #start()} on a {@code KafkaStreams} instance,
      * you still must {@link #close()} it to avoid resource leaks.
      *
-     * @param topology the topology specifying the computational logic
-     * @param config  the Kafka Streams configuration
+     * @param topology       the topology specifying the computational logic
+     * @param props          properties for {@link StreamsConfig}
+     * @param time           {@code Time} implementation; cannot be null
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
-                        final StreamsConfig config) {
-        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+                        final Properties props,
+                        final Time time) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier(), time);
     }
 
     /**
@@ -579,25 +571,70 @@ public class KafkaStreams {
      * you still must {@link #close()} it to avoid resource leaks.
      *
      * @param topology       the topology specifying the computational logic
-     * @param config         the Kafka Streams configuration
+     * @param props          properties for {@link StreamsConfig}
      * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
      *                       for the new {@code KafkaStreams} instance
+     * @param time           {@code Time} implementation; cannot be null
      * @throws StreamsException if any fatal error occurs
      */
     public KafkaStreams(final Topology topology,
+                        final Properties props,
+                        final KafkaClientSupplier clientSupplier,
+                        final Time time) {
+        this(topology.internalTopologyBuilder, new StreamsConfig(props), clientSupplier, time);
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+                        final Properties props) {
+        this(builder.internalTopologyBuilder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+                        final StreamsConfig config) {
+        this(builder.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final org.apache.kafka.streams.processor.TopologyBuilder builder,
+                        final StreamsConfig config,
+                        final KafkaClientSupplier clientSupplier) {
+        this(builder.internalTopologyBuilder, config, clientSupplier);
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final Topology topology,
+                        final StreamsConfig config) {
+        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+    }
+
+    /**
+     * @deprecated use {@link #KafkaStreams(Topology, Properties, KafkaClientSupplier)} instead
+     */
+    @Deprecated
+    public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final KafkaClientSupplier clientSupplier) {
         this(topology.internalTopologyBuilder, config, clientSupplier);
     }
 
     /**
-     * Create a {@code KafkaStreams} instance.
-     *
-     * @param topology       the topology specifying the computational logic
-     * @param config         the Kafka Streams configuration
-     * @param time           {@code Time} implementation; cannot be null
-     * @throws StreamsException if any fatal error occurs
+     * @deprecated use {@link #KafkaStreams(Topology, Properties, Time)} instead
      */
+    @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config,
                         final Time time) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ecfcad8..0eaf9ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -123,7 +123,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  * </ul>
  *
  *
- * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig)
+ * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, Properties)
  * @see ConsumerConfig
  * @see ProducerConfig
  */
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9770173..1dc8602 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -130,7 +130,7 @@ public class KafkaStreamsTest {
             Collections.<String>emptySet(), nodes.get(0));
         MockClientSupplier clientSupplier = new MockClientSupplier();
         clientSupplier.setClusterForAdminClient(cluster);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(props), clientSupplier);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, clientSupplier);
         streams.close();
         TestUtils.waitForCondition(new TestCondition() {
             @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 16d2611..80ab606 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -193,7 +193,7 @@ public abstract class AbstractJoinIntegrationTest {
         assert expectedResult.size() == input.size();
 
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG));
+        streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
 
         String expectedFinalResult = null;
 
@@ -234,7 +234,7 @@ public abstract class AbstractJoinIntegrationTest {
      */
     void runTest(final String expectedFinalResult, final String storeName) throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG));
+        streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
 
         try {
             streams.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 4b983cf..9dfb6dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -166,7 +166,7 @@ public class PurgeRepartitionTopicIntegrationTest {
                .groupBy(MockMapper.selectKeyKeyValueMapper())
                .count();
 
-        kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(streamsConfiguration), time);
+        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, time);
     }
 
     @After
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1da4c58..57ed161 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -145,7 +145,7 @@ public class RegexSourceIntegrationTest {
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         final List<String> assignedTopics = new ArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
+        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
                 return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -185,8 +185,6 @@ public class RegexSourceIntegrationTest {
         final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
         final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
 
-        final StreamsConfig streamsConfig = new StreamsConfig(streamsConfiguration);
-
         CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -196,7 +194,7 @@ public class RegexSourceIntegrationTest {
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
         final List<String> assignedTopics = new ArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
+        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
                 return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -333,9 +331,8 @@ public class RegexSourceIntegrationTest {
 
             final List<String> leaderAssignment = new ArrayList<>();
             final List<String> followerAssignment = new ArrayList<>();
-            StreamsConfig config = new StreamsConfig(streamsConfiguration);
 
-            partitionedStreamsLeader  = new KafkaStreams(builderLeader.build(), config, new DefaultKafkaClientSupplier() {
+            partitionedStreamsLeader  = new KafkaStreams(builderLeader.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
                 @Override
                 public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
                     return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -347,7 +344,7 @@ public class RegexSourceIntegrationTest {
 
                 }
             });
-            partitionedStreamsFollower  = new KafkaStreams(builderFollower.build(), config, new DefaultKafkaClientSupplier() {
+            partitionedStreamsFollower  = new KafkaStreams(builderFollower.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
                 @Override
                 public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
                     return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.