You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/23 01:37:24 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

ableegoldman commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r656694680



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+    private final TaskId taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private final Optional<Long> timeCurrentIdlingStarted;
+
+    public TaskMetadataImpl(final TaskId taskId,
+                            final Set<TopicPartition> topicPartitions,
+                            final Map<TopicPartition, Long> committedOffsets,
+                            final Map<TopicPartition, Long> endOffsets,
+                            final Optional<Long> timeCurrentIdlingStarted) {
+        this.taskId = taskId;
+        this.topicPartitions = topicPartitions;
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;
+        this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets() {
+        return endOffsets;
+    }
+
+    @Override
+    public Optional<Long> timeCurrentIdlingStarted() {
+        return timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final org.apache.kafka.streams.processor.internals.TaskMetadataImpl that = (org.apache.kafka.streams.processor.internals.TaskMetadataImpl) o;

Review comment:
       Does this need to be fully qualified? There's only one `TaskMetadataImpl` class right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1478,8 +1499,36 @@ public void cleanUp() {
      * @param storeName the {@code storeName} to find metadata for
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
      * this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead
      */
-    public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @param storeName the {@code storeName} to find metadata for
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
+     * this application
+     */
+    public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {

Review comment:
       I don't feel too strongly about any of these names, but it's probably best to try and keep them (a) consistent with each other, where possible, and (b) consistent with the return type. Assuming we go with something like `allStreamsMetadata` for the new method above, maybe something like `streamsMetadataForStore` would be appropriate for this one?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -27,10 +27,12 @@
 
 /**
  * Represents the state of a single task running within a {@link KafkaStreams} application.
+ * @deprecated since 3.0, not intended for public use, use {@link org.apache.kafka.streams.TaskMetadata} instead.

Review comment:
       ```suggestion
    * @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead.
   ```
   I think people might be a bit confused if we say this, just point them to the new interface

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -229,7 +230,7 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart
                                  final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
         if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
             allMetadata = Collections.emptyList();
-            localMetadata.set(new StreamsMetadata(thisHost,
+            localMetadata.set(new StreamsMetadataImpl(thisHost,
                                                   Collections.emptySet(),
                                                   Collections.emptySet(),
                                                   Collections.emptySet(),

Review comment:
       nit: fix alignment (here and below change)

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1458,8 +1457,30 @@ public void cleanUp() {
      * Note: this is a point in time view and it may change due to partition reassignment.
      *
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
      */
-    public Collection<StreamsMetadata> allMetadata() {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
+     * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     */
+    public Collection<StreamsMetadata> allRunningMetadata() {

Review comment:
       How about `allStreamsMetadata`? I think that gets at the core difference here, which is that this returns metadata for all Streams instances.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet());
+

Review comment:
       nit: fix alignment

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer<StreamThread> consumer) {
         for (final StreamThread thread : copy) consumer.accept(thread);
     }
 
+    /**
+     * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+     *
+     * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
+     * @deprecated since 3.0 use {@link #threadsMetadata()}
+     */
+    @Deprecated
+    @SuppressWarnings("deprecation")
+    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
+        return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
+                threadMetadata.threadName(),
+                threadMetadata.threadState(),
+                threadMetadata.consumerClientId(),
+                threadMetadata.restoreConsumerClientId(),
+                threadMetadata.producerClientIds(),
+                threadMetadata.adminClientId(),
+                threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet()),
+                threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet())))
+                .collect(Collectors.toSet());
+    }
+
     /**
      * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
      */
-    public Set<ThreadMetadata> localThreadsMetadata() {
+    public Set<ThreadMetadata> threadsMetadata() {

Review comment:
       I think `localThreadMetadata` sounds better to me (actually even better than the original `localThreadsMetadata` tbh), but I'm fine with `threadsMetadata` too 

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    Set<String> standbyStateStoreNames();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
+     */
+    String host();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().port();}
+     */
+    int port();
+
+    /**
+     * Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
+     * also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
+     * {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
+     */
+    boolean equals(Object o);
+
+    /**
+     * Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
+     * <pre>
+     * {@code
+     * Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
+     * }

Review comment:
       missing closing tag `</pre>`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org