You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/09 01:22:16 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option

guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r765364839



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void maybeNotifyTopologyVersionWaiters(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion;
+                if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("Thread {} is now on topology version {}", threadName, topologyVersionWaiters.topologyVersion);

Review comment:
       Sorry for getting some late comments after the PR is merged, but I feel this log entry could be confusing: we always update the thread version as in line 148, but we only log it after ALL threads have been updated beyond that thread. This means the log entry may be printed later than it actually happens.
   
   Another minor thing is that this function actually does two things: update the thread version, and then maybe-complete-version-waiters, the function name though only indicates the latter but not the former. Maybe it's better to extract line 148 long with line 156 into the caller of this function? Or we could rename the function as `updateThreadVersionAndMayBe...`. Personally I prefer the first option. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org