You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/02 18:43:00 UTC

[jira] [Commented] (KAFKA-4706) Unify StreamsKafkaClient instances

    [ https://issues.apache.org/jira/browse/KAFKA-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308485#comment-16308485 ] 

ASF GitHub Bot commented on KAFKA-4706:
---------------------------------------

guozhangwang closed pull request #2631: KAFKA-4706: Unify StreamsKafkaClient instances
URL: https://github.com/apache/kafka/pull/2631
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c57804e5325..5d47beab457 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -58,7 +58,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -133,6 +132,7 @@
     // usage only and should not be exposed to users at all.
     private final UUID processId;
     private final StreamsMetadataState streamsMetadataState;
+    private final StreamsKafkaClient streamsKafkaClient;
 
     private final StreamsConfig config;
 
@@ -324,6 +324,7 @@ public KafkaStreams(final TopologyBuilder builder,
         threadState = new HashMap<>(threads.length);
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+        streamsKafkaClient = new StreamsKafkaClient(config);
 
         final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
 
@@ -347,15 +348,16 @@ public KafkaStreams(final TopologyBuilder builder,
 
         for (int i = 0; i < threads.length; i++) {
             threads[i] = new StreamThread(builder,
-                                          config,
-                                          clientSupplier,
-                                          applicationId,
-                                          clientId,
-                                          processId,
-                                          metrics,
-                                          time,
-                                          streamsMetadataState,
-                                          cacheSizeBytes);
+                    config,
+                    clientSupplier,
+                    applicationId,
+                    clientId,
+                    processId,
+                    metrics,
+                    time,
+                    streamsMetadataState,
+                    cacheSizeBytes);
+            threads[i].setStreamsKafkaClient(streamsKafkaClient);
             threads[i].setStateListener(new StreamStateListener());
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
@@ -387,15 +389,8 @@ private static HostInfo parseHostInfo(final String endPoint) {
      * @throws StreamsException if brokers have version 0.10.0.x
      */
     private void checkBrokerVersionCompatibility() throws StreamsException {
-        final StreamsKafkaClient client = new StreamsKafkaClient(config);
 
-        client.checkBrokerCompatibility();
-
-        try {
-            client.close();
-        } catch (final IOException e) {
-            log.warn("Could not close StreamKafkaClient.", e);
-        }
+        streamsKafkaClient.checkBrokerCompatibility();
 
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1ad6dbcbfb4..d0b1336ecb3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -207,7 +207,7 @@ public void configure(Map<String, ?> configs) {
         }
 
         internalTopicManager = new InternalTopicManager(
-                new StreamsKafkaClient(this.streamThread.config),
+                this.streamThread.getStreamsKafkaClient(),
                 configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
                 configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
                         (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 01281423cca..1a7cbfee3bf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -203,6 +203,7 @@ private synchronized void setStateWhenNotInPendingShutdown(final State newState)
     final StateDirectory stateDirectory;
     private String originalReset;
     private StreamPartitionAssignor partitionAssignor = null;
+    private StreamsKafkaClient streamsKafkaClient;
     private boolean cleanRun = false;
     private long timerStartedMs;
     private long lastCleanMs;
@@ -1102,6 +1103,22 @@ public String toString(final String indent) {
         return sb.toString();
     }
 
+    /**
+     *
+     * @return
+     */
+    public StreamsKafkaClient getStreamsKafkaClient() {
+        return streamsKafkaClient;
+    }
+
+    /**
+     *
+     * @param streamsKafkaClient
+     */
+    public void setStreamsKafkaClient(StreamsKafkaClient streamsKafkaClient) {
+        this.streamsKafkaClient = streamsKafkaClient;
+    }
+
     /**
      * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
      * overrides one of its functions for efficiency


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Unify StreamsKafkaClient instances
> ----------------------------------
>
>                 Key: KAFKA-4706
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4706
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Matthias J. Sax
>            Assignee: Sharad
>            Priority: Minor
>              Labels: beginner, easyfix, newbie
>
> Kafka Streams currently used two instances of {{StreamsKafkaClient}} (one in {{KafkaStreams}} and one in {{InternalTopicManager}}).
> We want to unify both such that only a single instance is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)