You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/08 12:26:51 UTC

[GitHub] [kafka] jlprat opened a new pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

jlprat opened a new pull request #10840:
URL: https://github.com/apache/kafka/pull/10840


   Implementation of [KIP-744](https://cwiki.apache.org/confluence/x/XIrOCg).
   
   Creates new Interfaces for TaskMetadata, ThreadMetadata, and
   StreamsMetadata, providing internal implementations for each of them.
   
   Deprecates current TaskMetadata, ThreadMetadata under o.a.k.s.processor,
   and SreamsMetadata under a.o.k.s.state.
   
   Updates references on internal classes from deprecated classes to new interfaces.
   
   Deprecates methods on KStreams returning deprecated ThreadMeatada and
   StreamsMetadta, and provides new ones returning the new interfaces.
   
   Update Javadocs referencing to deprecated classes and methods to point
   to the right ones.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-868661683


   I am just waiting for the builds to finish and then I will merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-868693590


   Thanks a lot @cadonna and @ableegoldman for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-868680664


   It seems that the test failures were RaftClusterTest and SslTransportLayerTest ones. Both groups being known flaky ones AFAIU.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-866870832


   @ableegoldman and @cadonna thanks a lot for your reviews! I think I addressed all of your comments. Let me know what do you think about them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r658722233



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.

Review comment:
       ```suggestion
    * Metadata of a Kafka Streams client.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */

Review comment:
       ```suggestion
       /**
        * Source topic partitions of the active tasks of the Streams client.
        *
        * @return source topic partitions of the active tasks
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * This function will return a {@link TaskId} with basic task metadata
+     *
+     * @return the basic task metadata such as subtopology and partition id
+     */

Review comment:
       ```suggestion
       /**
        * Task ID of the task.
        *
        * @return task ID consisting of subtopology and partition ID
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * This function will return a {@link TaskId} with basic task metadata
+     *
+     * @return the basic task metadata such as subtopology and partition id
+     */
+    TaskId taskId();
+
+    /**
+     * This function will return a set of the current TopicPartitions
+     *
+     * @return set of topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * This function will return a map of TopicPartitions and the highest committed offset seen so far
+     *
+     * @return map with an entry for all topic partitions with the committed offset as a value
+     */

Review comment:
       ```suggestion
       /**
        * Offsets of the source topic partitions committed so far by the task.
        *
        * @return map from source topic partitions to committed offsets
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     *
+     * @return a set of metadata for the active tasks
+     */

Review comment:
       ```suggestion
       /**
        * Metadata of the active tasks assigned to the stream thread.
        *
        * @return metadata of the active tasks
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     *
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
+     *
+     * @return a set of metadata for the standby tasks
+     */
+    Set<TaskMetadata> standbyTasks();
+
+    /**
+     * This function will return the Client Id for the consumer
+     *
+     * @return the consumer Client Id
+     */
+    String consumerClientId();
+
+    /**
+     * This function will return the Client id for the restore consumer
+     *
+     * @return the restore consumer Client Id
+     */
+    String restoreConsumerClientId();
+
+    /**
+     * This function will return the set of Client Ids for the producers
+     *
+     * @return set of producer Client Ids
+     */

Review comment:
       ```suggestion
       /**
        * Client IDs of the Kafka producers used by the stream thread.
        *
        * @return client IDs of the Kafka producers
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    Set<String> standbyStateStoreNames();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
+     *
+     * @return the host where the given process runs
+     */
+    String host();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().port();}
+     *
+     * @return the port number where the given process runs
+     */

Review comment:
       ```suggestion
       /**
        * Port on which the Streams client listens.
        * 
        * This method is equivalent to {@code StreamsMetadata.hostInfo().port();}
        *
        * @return the port on which Streams client listens
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */

Review comment:
       ```suggestion
       /**
        * Names of the state stores assigned to active tasks of the Streams client.
        *
        * @return names of the state stores assigned to active tasks
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */

Review comment:
       ```suggestion
       /**
        * Names of the state stores assigned to standby tasks of the Streams client.
        *
        * @return names of the state stores assigned to standby tasks
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */

Review comment:
       ```suggestion
       /**
        * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the Streams
        * client.
        *
        * @return {@link HostInfo} corresponding to the Streams client
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    Set<String> standbyStateStoreNames();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
+     *
+     * @return the host where the given process runs
+     */

Review comment:
       ```suggestion
       /**
        * Host where the Streams client runs. 
        *
        * This method is equivalent to {@code StreamsMetadata.hostInfo().host();}
        *
        * @return the host where the Streams client runs
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+/**
+ * Represents the state of the different a given Kafka Streams instance running within a {@link KafkaStreams} application.
+ */
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */

Review comment:
       ```suggestion
       /**
        * Changelog topic partitions for the state stores the standby tasks of the Streams client replicates.
        *
        * @return set of changelog topic partitions of the standby tasks
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * This function will return a {@link TaskId} with basic task metadata
+     *
+     * @return the basic task metadata such as subtopology and partition id
+     */
+    TaskId taskId();
+
+    /**
+     * This function will return a set of the current TopicPartitions
+     *
+     * @return set of topic partitions
+     */

Review comment:
       ```suggestion
       /**
        * Source topic partitions of the task.
        *
        * @return source topic partitions
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     *
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
+     *
+     * @return a set of metadata for the standby tasks
+     */

Review comment:
       ```suggestion
       /**
        * Metadata of the standby tasks assigned to the stream thread.
        *
        * @return metadata of the standby tasks
   
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */

Review comment:
       ```suggestion
       /**
        * State of the stream thread
        *
        * @return the state
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * This function will return a {@link TaskId} with basic task metadata
+     *
+     * @return the basic task metadata such as subtopology and partition id
+     */
+    TaskId taskId();
+
+    /**
+     * This function will return a set of the current TopicPartitions
+     *
+     * @return set of topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * This function will return a map of TopicPartitions and the highest committed offset seen so far
+     *
+     * @return map with an entry for all topic partitions with the committed offset as a value
+     */
+    Map<TopicPartition, Long> committedOffsets();
+
+    /**
+     * This function will return a map of TopicPartitions and the highest offset seen so far in the Topic
+     *
+     * @return map with an entry for all topic partitions with the highest offset as a value
+     */

Review comment:
       ```suggestion
       /**
        * End offsets of the source topic partitions of the task.
        *
        * @return map source topic partition to end offsets
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */

Review comment:
       ```suggestion
       /**
        * Name of the stream thread
        *
        * @return the name
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */

Review comment:
       ```suggestion
   /**
    * Metadata of a task.
    */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     *
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
+     *
+     * @return a set of metadata for the standby tasks
+     */
+    Set<TaskMetadata> standbyTasks();
+
+    /**
+     * This function will return the Client Id for the consumer
+     *
+     * @return the consumer Client Id
+     */
+    String consumerClientId();
+
+    /**
+     * This function will return the Client id for the restore consumer
+     *
+     * @return the restore consumer Client Id
+     */

Review comment:
       ```suggestion
       /**
        * Client ID of the restore Kafka consumer used by the stream thread
        *
        * @return client ID of the restore Kafka consumer
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     *
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
+     *
+     * @return a set of metadata for the standby tasks
+     */
+    Set<TaskMetadata> standbyTasks();
+
+    /**
+     * This function will return the Client Id for the consumer
+     *
+     * @return the consumer Client Id
+     */
+    String consumerClientId();
+
+    /**
+     * This function will return the Client id for the restore consumer
+     *
+     * @return the restore consumer Client Id
+     */
+    String restoreConsumerClientId();
+
+    /**
+     * This function will return the set of Client Ids for the producers
+     *
+     * @return set of producer Client Ids
+     */
+    Set<String> producerClientIds();
+
+    /**
+     * This function will return the Client Id for the admin client
+     *
+     * @return the admin Client Id
+     */

Review comment:
       ```suggestion
       /**
        * Client ID of the admin client used by the stream thread.
        *
        * @return client ID of the admin client
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * This function will return a {@link TaskId} with basic task metadata
+     *
+     * @return the basic task metadata such as subtopology and partition id
+     */
+    TaskId taskId();
+
+    /**
+     * This function will return a set of the current TopicPartitions
+     *
+     * @return set of topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * This function will return a map of TopicPartitions and the highest committed offset seen so far
+     *
+     * @return map with an entry for all topic partitions with the committed offset as a value
+     */
+    Map<TopicPartition, Long> committedOffsets();
+
+    /**
+     * This function will return a map of TopicPartitions and the highest offset seen so far in the Topic
+     *
+     * @return map with an entry for all topic partitions with the highest offset as a value
+     */
+    Map<TopicPartition, Long> endOffsets();
+
+    /**
+     * This function will return the time task idling started, if the task is not currently idling it will return empty
+     *
+     * @return A filled {@code Optional} with the time where task idling started, and empty {@code Optional} otherwise
+     */

Review comment:
       ```suggestion
       /**
        * Time task idling started. If the task is not currently idling it will return empty.
        *
        * @return time when task idling started, empty {@code Optional} if the task is currently not idling
        */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */

Review comment:
       ```suggestion
   /**
    * Metadata of a stream thread.
    */
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * This function will return the state of the Thread
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * This function will return the name of the Thread
+     *
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     *
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
+     *
+     * @return a set of metadata for the standby tasks
+     */
+    Set<TaskMetadata> standbyTasks();
+
+    /**
+     * This function will return the Client Id for the consumer
+     *
+     * @return the consumer Client Id
+     */

Review comment:
       ```suggestion
       /**
        * Client ID of the Kafka consumer used by the stream thread.
        *
        * @return client ID of the Kafka consumer
        */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-866866991


   Needed to rebase as there were some conflicts with trunk, hence the force push.
   
   I applied the changes in separate commits:
   - One for method renames, formattings and small refactors (like the unmodifiable collections)
   - One for Adding tests
   - One to fix the regression caused by the unmodifiable collections in constructor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657560587



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer<StreamThread> consumer) {
         for (final StreamThread thread : copy) consumer.accept(thread);
     }
 
+    /**
+     * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+     *
+     * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
+     * @deprecated since 3.0 use {@link #threadsMetadata()}
+     */
+    @Deprecated
+    @SuppressWarnings("deprecation")
+    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
+        return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
+                threadMetadata.threadName(),
+                threadMetadata.threadState(),
+                threadMetadata.consumerClientId(),
+                threadMetadata.restoreConsumerClientId(),
+                threadMetadata.producerClientIds(),
+                threadMetadata.adminClientId(),
+                threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet()),
+                threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet())))
+                .collect(Collectors.toSet());
+    }
+
     /**
      * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
      */
-    public Set<ThreadMetadata> localThreadsMetadata() {
+    public Set<ThreadMetadata> threadsMetadata() {

Review comment:
       @cadonna `localThreadMetadata` still sounds more correct to me than `localThreadsMetadata`. I really can't explain it other than to say that English is weird, and names/titles like this do not always follow the regular rules of grammar/plurals 🤷‍♀️ 
   
   But actually I like your suggestion `metadataForLocalThreads()` even better than any of them, SGTM




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-867665743


   > Regarding the SourceConnectorsIntegrationTest failing in Jenkins, do you agree that is unrelated?
   
   Yes, I agree!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna merged pull request #10840:
URL: https://github.com/apache/kafka/pull/10840


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-861246389


   Following Konstantine's recommendation on the mailing list in regards of the dates for the release of Kafka 3.0.0, I'm pinging again potential reviewers for this KIP's implementation.
   @ableegoldman (as creator of the original Jira Ticket), @cadonna and @guozhangwang (as voters of the KIP), I would highly appreciate your feedback in this PR.
   
   Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r656890122



##########
File path: streams/src/main/java/org/apache/kafka/streams/TaskMetadata.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+
+/**
+ * Represents the state of a single task running within a {@link KafkaStreams} application.
+ */
+public interface TaskMetadata {
+
+    /**
+     * @return the basic task metadata such as subtopology and partition id
+     */
+    TaskId taskId();
+
+    /**
+     * This function will return a set of the current TopicPartitions
+     */
+    Set<TopicPartition> topicPartitions();

Review comment:
       Could you please use javadoc mark-up like `@return` and `@param` for the docs? Here and for the other methods.

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     */
+    Set<TaskMetadata> activeTasks();

Review comment:
       Could you use javadoc mark-up for the docs? Here and for the other methods.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+    private final String threadName;
+
+    private final String threadState;
+
+    private final Set<TaskMetadata> activeTasks;
+
+    private final Set<TaskMetadata> standbyTasks;
+
+    private final String mainConsumerClientId;
+
+    private final String restoreConsumerClientId;
+
+    private final Set<String> producerClientIds;
+
+    // the admin client should be shared among all threads, so the client id should be the same;
+    // we keep it at the thread-level for user's convenience and possible extensions in the future
+    private final String adminClientId;
+
+    public ThreadMetadataImpl(final String threadName,
+                              final String threadState,
+                              final String mainConsumerClientId,
+                              final String restoreConsumerClientId,
+                              final Set<String> producerClientIds,
+                              final String adminClientId,
+                              final Set<TaskMetadata> activeTasks,
+                              final Set<TaskMetadata> standbyTasks) {
+        this.mainConsumerClientId = mainConsumerClientId;
+        this.restoreConsumerClientId = restoreConsumerClientId;
+        this.producerClientIds = producerClientIds;
+        this.adminClientId = adminClientId;
+        this.threadName = threadName;
+        this.threadState = threadState;
+        this.activeTasks = Collections.unmodifiableSet(activeTasks);
+        this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
+    }
+
+
+    public String threadState() {
+        return threadState;
+    }
+
+    public String threadName() {
+        return threadName;
+    }
+
+
+    public Set<TaskMetadata> activeTasks() {
+        return activeTasks;
+    }
+
+    public Set<TaskMetadata> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public String consumerClientId() {
+        return mainConsumerClientId;
+    }
+
+    public String restoreConsumerClientId() {
+        return restoreConsumerClientId;
+    }
+
+    public Set<String> producerClientIds() {
+        return producerClientIds;
+    }
+
+    public String adminClientId() {
+        return adminClientId;
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       Could you add unit tests for equals() and hashCode()? Since we guarantee a behavior on the interface, it makes sense to verify if the behavior of the implementation is correct.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    Set<String> standbyStateStoreNames();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
+     */
+    String host();

Review comment:
       Could you describe the return value with `@return`? Please also check the other methods if they use appropriate javadocs mark-up. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+    private final TaskId taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private final Optional<Long> timeCurrentIdlingStarted;
+
+    public TaskMetadataImpl(final TaskId taskId,
+                            final Set<TopicPartition> topicPartitions,
+                            final Map<TopicPartition, Long> committedOffsets,
+                            final Map<TopicPartition, Long> endOffsets,
+                            final Optional<Long> timeCurrentIdlingStarted) {
+        this.taskId = taskId;
+        this.topicPartitions = topicPartitions;
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;
+        this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets() {
+        return endOffsets;
+    }
+
+    @Override
+    public Optional<Long> timeCurrentIdlingStarted() {
+        return timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       Could you add unit tests for `equals()` and `hashCode()`? Since we guarantee a behavior on the interface, it makes sense to verify if the behavior of the implementation is correct.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -27,10 +27,12 @@
 
 /**
  * Represents the state of a single task running within a {@link KafkaStreams} application.
+ * @deprecated since 3.0, not intended for public use, use {@link org.apache.kafka.streams.TaskMetadata} instead.
  */
+@Deprecated

Review comment:
       nit: If we deprecate the class, we do not need to deprecate the constructor anymore, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1478,8 +1499,36 @@ public void cleanUp() {
      * @param storeName the {@code storeName} to find metadata for
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
      * this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead
      */
-    public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @param storeName the {@code storeName} to find metadata for
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
+     * this application
+     */
+    public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {

Review comment:
       I am +1 for `streamsMetadataForStore`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet());
+
+    private final HostInfo hostInfo;
+
+    private final Set<String> stateStoreNames;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Set<String> standbyStateStoreNames;
+
+    private final Set<TopicPartition> standbyTopicPartitions;
+
+    public StreamsMetadataImpl(final HostInfo hostInfo,
+                               final Set<String> stateStoreNames,
+                               final Set<TopicPartition> topicPartitions,
+                               final Set<String> standbyStateStoreNames,
+                               final Set<TopicPartition> standbyTopicPartitions) {
+
+        this.hostInfo = hostInfo;
+        this.stateStoreNames = stateStoreNames;
+        this.topicPartitions = topicPartitions;
+        this.standbyTopicPartitions = standbyTopicPartitions;
+        this.standbyStateStoreNames = standbyStateStoreNames;
+    }
+
+    /**
+     * The value of {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    @Override
+    public HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    @Override
+    public Set<String> stateStoreNames() {
+        return Collections.unmodifiableSet(stateStoreNames);
+    }
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    @Override
+    public Set<TopicPartition> topicPartitions() {
+        return Collections.unmodifiableSet(topicPartitions);
+    }
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    @Override
+    public Set<TopicPartition> standbyTopicPartitions() {
+        return Collections.unmodifiableSet(standbyTopicPartitions);
+    }
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    @Override
+    public Set<String> standbyStateStoreNames() {
+        return Collections.unmodifiableSet(standbyStateStoreNames);
+    }
+
+    @Override
+    public String host() {
+        return hostInfo.host();
+    }
+
+    @SuppressWarnings("unused")
+    @Override
+    public int port() {
+        return hostInfo.port();
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       Could you add unit tests for equals() and hashCode()? Since we guarantee a behavior on the interface, it makes sense to verify if the behavior of the implementation is correct.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+    private final TaskId taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private final Optional<Long> timeCurrentIdlingStarted;
+
+    public TaskMetadataImpl(final TaskId taskId,
+                            final Set<TopicPartition> topicPartitions,
+                            final Map<TopicPartition, Long> committedOffsets,
+                            final Map<TopicPartition, Long> endOffsets,
+                            final Optional<Long> timeCurrentIdlingStarted) {
+        this.taskId = taskId;
+        this.topicPartitions = topicPartitions;
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;

Review comment:
       Since this class is read only, it makes sense to use `Collections.unmodifiable*()` methods here to avoid modification of the contents of the member fields.
   Unit tests as in `StreamsMetadataTest` would also be good. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public class ThreadMetadataImpl implements ThreadMetadata {
+
+    private final String threadName;
+
+    private final String threadState;
+
+    private final Set<TaskMetadata> activeTasks;
+
+    private final Set<TaskMetadata> standbyTasks;
+
+    private final String mainConsumerClientId;
+
+    private final String restoreConsumerClientId;
+
+    private final Set<String> producerClientIds;
+
+    // the admin client should be shared among all threads, so the client id should be the same;
+    // we keep it at the thread-level for user's convenience and possible extensions in the future
+    private final String adminClientId;
+
+    public ThreadMetadataImpl(final String threadName,
+                              final String threadState,
+                              final String mainConsumerClientId,
+                              final String restoreConsumerClientId,
+                              final Set<String> producerClientIds,
+                              final String adminClientId,
+                              final Set<TaskMetadata> activeTasks,
+                              final Set<TaskMetadata> standbyTasks) {
+        this.mainConsumerClientId = mainConsumerClientId;
+        this.restoreConsumerClientId = restoreConsumerClientId;
+        this.producerClientIds = producerClientIds;

Review comment:
       I think, you should use `Collections.unmodifiableSet()` also here.
   Also unit tests to verify the immutability as in `StreamsMetadataTest` would be great!

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1972,12 +1971,6 @@ public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
         assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
     }
 
-    private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata metadata, final StreamThread.State state) {

Review comment:
       Thank you for the clean up!

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet());
+
+    private final HostInfo hostInfo;
+
+    private final Set<String> stateStoreNames;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Set<String> standbyStateStoreNames;
+
+    private final Set<TopicPartition> standbyTopicPartitions;
+
+    public StreamsMetadataImpl(final HostInfo hostInfo,
+                               final Set<String> stateStoreNames,
+                               final Set<TopicPartition> topicPartitions,
+                               final Set<String> standbyStateStoreNames,
+                               final Set<TopicPartition> standbyTopicPartitions) {
+
+        this.hostInfo = hostInfo;
+        this.stateStoreNames = stateStoreNames;
+        this.topicPartitions = topicPartitions;
+        this.standbyTopicPartitions = standbyTopicPartitions;
+        this.standbyStateStoreNames = standbyStateStoreNames;
+    }
+
+    /**
+     * The value of {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    @Override
+    public HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    @Override
+    public Set<String> stateStoreNames() {
+        return Collections.unmodifiableSet(stateStoreNames);

Review comment:
       I think it would be better to make the collections immutable in the constructor since they should also not be modified within this class (for now). 

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer<StreamThread> consumer) {
         for (final StreamThread thread : copy) consumer.accept(thread);
     }
 
+    /**
+     * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+     *
+     * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
+     * @deprecated since 3.0 use {@link #threadsMetadata()}
+     */
+    @Deprecated
+    @SuppressWarnings("deprecation")
+    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
+        return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
+                threadMetadata.threadName(),
+                threadMetadata.threadState(),
+                threadMetadata.consumerClientId(),
+                threadMetadata.restoreConsumerClientId(),
+                threadMetadata.producerClientIds(),
+                threadMetadata.adminClientId(),
+                threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet()),
+                threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet())))
+                .collect(Collectors.toSet());
+    }
+
     /**
      * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
      */
-    public Set<ThreadMetadata> localThreadsMetadata() {
+    public Set<ThreadMetadata> threadsMetadata() {

Review comment:
       nit: I think `localThreadMetadata` does not sound consistent since metadata of multiple stream threads is returned. What about `metadataForLocalThreads()`?

##########
File path: docs/streams/upgrade-guide.html
##########
@@ -121,10 +121,23 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
     <p>
         The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
         (which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
-        and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
-        These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
-        method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
+        and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and introduced a new interface
+        <code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
+        of Kafka codebase.
+        Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
+        <code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#getActiveTasks</code> and <code>ThreadMetadata#getStandbyTasks</code>.

Review comment:
       ```suggestion
           <code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1458,8 +1457,30 @@ public void cleanUp() {
      * Note: this is a point in time view and it may change due to partition reassignment.
      *
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
      */
-    public Collection<StreamsMetadata> allMetadata() {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
+     * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     */
+    public Collection<StreamsMetadata> allRunningMetadata() {

Review comment:
       What about `metadataForAllStreamsClients()`? I think this makes it more explicit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-868513219


   @cadonna accepted your suggestions. Thanks a bunch!a


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657131881



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1458,8 +1457,30 @@ public void cleanUp() {
      * Note: this is a point in time view and it may change due to partition reassignment.
      *
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
      */
-    public Collection<StreamsMetadata> allMetadata() {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
+     * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     */
+    public Collection<StreamsMetadata> allRunningMetadata() {

Review comment:
       I decided to for for the pattern xxxForxxx to keep consistency among different changes.
   
   It is now `metadataForAllStreamsClients` but happy to change if anyone has reasons against it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657132064



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer<StreamThread> consumer) {
         for (final StreamThread thread : copy) consumer.accept(thread);
     }
 
+    /**
+     * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+     *
+     * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
+     * @deprecated since 3.0 use {@link #threadsMetadata()}
+     */
+    @Deprecated
+    @SuppressWarnings("deprecation")
+    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
+        return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
+                threadMetadata.threadName(),
+                threadMetadata.threadState(),
+                threadMetadata.consumerClientId(),
+                threadMetadata.restoreConsumerClientId(),
+                threadMetadata.producerClientIds(),
+                threadMetadata.adminClientId(),
+                threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet()),
+                threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet())))
+                .collect(Collectors.toSet());
+    }
+
     /**
      * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
      */
-    public Set<ThreadMetadata> localThreadsMetadata() {
+    public Set<ThreadMetadata> threadsMetadata() {

Review comment:
       I decided to for for the pattern xxxForxxx to keep consistency among different changes.
   
   It is now `metadataForLocalThreads` but happy to change if anyone has reasons against it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-868410976


   Hi @cadonna I applied all the feedback you provided. I gave my best at adding some Extra sentences on the JavaDocs for the interfaces. Let me know if they suffice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657138308



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+    public static final TaskId TASK_ID = new TaskId(1, 2);
+    public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
+    public static final Map<TopicPartition, Long> COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+    public static final Map<TopicPartition, Long> END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+    public static final Optional<Long> TIME_CURRENT_IDLING_STARTED = Optional.of(3L);
+
+    private TaskMetadata taskMetadata;
+
+    @Before
+    public void setUp() {
+        taskMetadata = new TaskMetadataImpl(
+                TASK_ID,
+                TOPIC_PARTITIONS,
+                COMMITTED_OFFSETS,
+                END_OFFSETS,
+                TIME_CURRENT_IDLING_STARTED);
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(taskMetadata.topicPartitions()));
+        assertTrue(isUnmodifiable(taskMetadata.committedOffsets()));
+        assertTrue(isUnmodifiable(taskMetadata.endOffsets()));
+    }
+
+    @Test
+    public void shouldFollowEqualsAndHasCodeContract() {

Review comment:
       @cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive cases and any of the reasons why it might fall into the negative case.
   Please note that objects differing only on committed offsets, end offsets, and/or time curring idling started will be considered equals.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
##########
@@ -55,6 +62,63 @@ public void shouldNotAllowModificationOfInternalStateViaGetters() {
         assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
     }
 
+    @Test
+    public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
       @cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive case and any of the reasons why it might fall into the negative case.
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class ThreadMetadataImplTest {
+
+    public static final String THREAD_NAME = "thread name";
+    public static final String THREAD_STATE = "thread state";
+    public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer ClientID";
+    public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer ClientID";
+    public static final String CLIENT_ID_1 = "client Id 1";
+    public static final String CLIENT_ID_2 = "client Id 2";
+    public static final Set<String> PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, CLIENT_ID_2);
+    public static final TaskId TASK_ID_0 = new TaskId(1, 2);
+    public static final TaskId TASK_ID_1 = new TaskId(1, 1);
+    public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
+    public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
+    public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
+    public static final TaskMetadata TM_0 = new TaskMetadataImpl(
+            TASK_ID_0,
+            mkSet(TP_0_0, TP_1_0),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            Optional.of(3L));
+    public static final TaskMetadata TM_1 = new TaskMetadataImpl(
+            TASK_ID_1,
+            mkSet(TP_0_1, TP_1_1),
+            mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+            mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+            Optional.of(3L));
+    public static final Set<TaskMetadata> STANDBY_TASKS = mkSet(TM_0, TM_1);
+    public static final Set<TaskMetadata> ACTIVE_TASKS = mkSet(TM_1);
+    public static final String ADMIN_CLIENT_ID = "admin ClientID";
+
+    private ThreadMetadata threadMetadata;
+
+    @Before
+    public void setUp() {
+        threadMetadata = new ThreadMetadataImpl(
+                THREAD_NAME,
+                THREAD_STATE,
+                MAIN_CONSUMER_CLIENT_ID,
+                RESTORE_CONSUMER_CLIENT_ID,
+                PRODUCER_CLIENT_IDS,
+                ADMIN_CLIENT_ID,
+                ACTIVE_TASKS,
+                STANDBY_TASKS
+                );
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(threadMetadata.producerClientIds()));
+        assertTrue(isUnmodifiable(threadMetadata.activeTasks()));
+        assertTrue(isUnmodifiable(threadMetadata.standbyTasks()));
+    }
+
+    @Test
+    public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
       @cadonna For hash code and equals contract validation, I decided to keep it under one single test, and validate the positive case and any of the reasons why it might fall into the negative case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-867593067


   Hi @cadonna I'll try to fix address the comments either today or tomorrow.
   
   Regarding the `SourceConnectorsIntegrationTest` failing in Jenkins, do you agree that is unrelated?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657813645



##########
File path: docs/streams/upgrade-guide.html
##########
@@ -121,10 +121,23 @@ <h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API
     <p>
         The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
         (which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
-        and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
-        These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
-        method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
+        and will be removed, as they were never intended for public use. We have also deprecated the <code>org.apache.kafka.streams.processsor.TaskMetadata</code> class and introduced a new interface
+        <code>org.apache.kafka.streams.TaskMetadata</code> to be used instead. This change was introduced to better reflect the fact that <code>TaskMetadata</code> was not meant to be instantiated outside
+        of Kafka codebase.
+        Please note that the new <code>TaskMetadata</code> offers APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new
+        <code>org.apache.kafka.streams.TaskMetadata</code> which offers these better methods, for example, by using the new <code>ThreadMetadata#activeTasks</code> and <code>ThreadMetadata#standbyTasks</code>.
+        <code>org.apache.kafka.streams.processor.ThreadMetadata</code> class is also now deprecated and the newly introduced interface <code>org.apache.kafka.streams.ThreadMetadata</code> is to be used instead. In this new <code>ThreadMetadata</code>
+        interface, any reference to the deprecated <code>TaskMetadata</code> is replaced by the new interface.
+        Finally, also <code>org.apache.kafka.streams.state.StreamsMetadata</code> has been deprecated. Please migrate to the new <code>org.apache.kafka.streams.StreamsMetadata</code>.
+        We have deprecated several methods under <code>org.apache.kafka.streams.KafkaStreams</code> that returned the aforementioned deprecated classes:
     </p>
+    <ul>
+        <li>Users of <code>KafkaStreams#allMetadata</code> are meant to migrate to the new <code>KafkaStreams#allRunningMetadata</code>.</li>
+        <li>Users of <code>KafkaStreams#allMetadataForStore(String)</code> are meant to migrate to the new <code>KafkaStreams#allMetadataForGivenStore(String)</code>.</li>
+        <li>Users of <code>KafkaStreams#localThreadsMetadata</code> are meant to migrate to the new <code>KafkaStreams#threadsMetadata</code>.</li>

Review comment:
       You need to adapt this text to the new method names.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+    public static final TaskId TASK_ID = new TaskId(1, 2);
+    public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
+    public static final Map<TopicPartition, Long> COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+    public static final Map<TopicPartition, Long> END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+    public static final Optional<Long> TIME_CURRENT_IDLING_STARTED = Optional.of(3L);
+
+    private TaskMetadata taskMetadata;
+
+    @Before
+    public void setUp() {
+        taskMetadata = new TaskMetadataImpl(
+                TASK_ID,
+                TOPIC_PARTITIONS,
+                COMMITTED_OFFSETS,
+                END_OFFSETS,
+                TIME_CURRENT_IDLING_STARTED);

Review comment:
       nit:
   ```suggestion
           taskMetadata = new TaskMetadataImpl(
               TASK_ID,
               TOPIC_PARTITIONS,
               COMMITTED_OFFSETS,
               END_OFFSETS,
               TIME_CURRENT_IDLING_STARTED
           );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class ThreadMetadataImplTest {
+
+    public static final String THREAD_NAME = "thread name";
+    public static final String THREAD_STATE = "thread state";
+    public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer ClientID";
+    public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer ClientID";
+    public static final String CLIENT_ID_1 = "client Id 1";
+    public static final String CLIENT_ID_2 = "client Id 2";
+    public static final Set<String> PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, CLIENT_ID_2);
+    public static final TaskId TASK_ID_0 = new TaskId(1, 2);
+    public static final TaskId TASK_ID_1 = new TaskId(1, 1);
+    public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
+    public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
+    public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
+    public static final TaskMetadata TM_0 = new TaskMetadataImpl(
+            TASK_ID_0,
+            mkSet(TP_0_0, TP_1_0),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            Optional.of(3L));
+    public static final TaskMetadata TM_1 = new TaskMetadataImpl(
+            TASK_ID_1,
+            mkSet(TP_0_1, TP_1_1),
+            mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+            mkMap(mkEntry(TP_0_1, 1L), mkEntry(TP_1_1, 2L)),
+            Optional.of(3L));
+    public static final Set<TaskMetadata> STANDBY_TASKS = mkSet(TM_0, TM_1);
+    public static final Set<TaskMetadata> ACTIVE_TASKS = mkSet(TM_1);
+    public static final String ADMIN_CLIENT_ID = "admin ClientID";
+
+    private ThreadMetadata threadMetadata;
+
+    @Before
+    public void setUp() {
+        threadMetadata = new ThreadMetadataImpl(
+                THREAD_NAME,
+                THREAD_STATE,
+                MAIN_CONSUMER_CLIENT_ID,
+                RESTORE_CONSUMER_CLIENT_ID,
+                PRODUCER_CLIENT_IDS,
+                ADMIN_CLIENT_ID,
+                ACTIVE_TASKS,
+                STANDBY_TASKS
+                );
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(threadMetadata.producerClientIds()));
+        assertTrue(isUnmodifiable(threadMetadata.activeTasks()));
+        assertTrue(isUnmodifiable(threadMetadata.standbyTasks()));
+    }
+
+    @Test
+    public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
       I prefer to have separate tests, because when something fails you immediately know the cause. But if you like to keep like that, I am fine with it.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -229,11 +230,11 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart
                                  final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
         if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
             allMetadata = Collections.emptyList();
-            localMetadata.set(new StreamsMetadata(thisHost,
-                                                  Collections.emptySet(),
-                                                  Collections.emptySet(),
-                                                  Collections.emptySet(),
-                                                  Collections.emptySet()
+            localMetadata.set(new StreamsMetadataImpl(thisHost,
+                    Collections.emptySet(),
+                    Collections.emptySet(),
+                    Collections.emptySet(),
+                    Collections.emptySet()
             ));

Review comment:
       nit: it should be 4 not 8 spaces
   ```suggestion
               localMetadata.set(new StreamsMetadataImpl(
                   thisHost,
                   Collections.emptySet(),
                   Collections.emptySet(),
                   Collections.emptySet(),
                   Collections.emptySet()
               ));
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet());

Review comment:
       ```suggestion
       public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(
           HostInfo.unavailable(),
           Collections.emptySet(),
           Collections.emptySet(),
           Collections.emptySet(),
           Collections.emptySet()
       );
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -258,11 +259,11 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart
                     standbyStoresOnHost.addAll(getStoresOnHost(storeToSourceTopics, standbyPartitionsOnHost));
                 }
 
-                final StreamsMetadata metadata = new StreamsMetadata(hostInfo,
-                                                                     activeStoresOnHost,
-                                                                     activePartitionsOnHost,
-                                                                     standbyStoresOnHost,
-                                                                     standbyPartitionsOnHost);
+                final StreamsMetadata metadata = new StreamsMetadataImpl(hostInfo,
+                        activeStoresOnHost,
+                        activePartitionsOnHost,
+                        standbyStoresOnHost,
+                        standbyPartitionsOnHost);

Review comment:
       ```suggestion
                   final StreamsMetadata metadata = new StreamsMetadataImpl(
                       hostInfo,
                       activeStoresOnHost,
                       activePartitionsOnHost,
                       standbyStoresOnHost,
                       standbyPartitionsOnHost
                   );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImplTest.java
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+public class ThreadMetadataImplTest {
+
+    public static final String THREAD_NAME = "thread name";
+    public static final String THREAD_STATE = "thread state";
+    public static final String MAIN_CONSUMER_CLIENT_ID = "main Consumer ClientID";
+    public static final String RESTORE_CONSUMER_CLIENT_ID = "restore Consumer ClientID";
+    public static final String CLIENT_ID_1 = "client Id 1";
+    public static final String CLIENT_ID_2 = "client Id 2";
+    public static final Set<String> PRODUCER_CLIENT_IDS = mkSet(CLIENT_ID_1, CLIENT_ID_2);
+    public static final TaskId TASK_ID_0 = new TaskId(1, 2);
+    public static final TaskId TASK_ID_1 = new TaskId(1, 1);
+    public static final TopicPartition TP_0_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1_0 = new TopicPartition("t", 1);
+    public static final TopicPartition TP_0_1 = new TopicPartition("t", 2);
+    public static final TopicPartition TP_1_1 = new TopicPartition("t", 3);
+    public static final TaskMetadata TM_0 = new TaskMetadataImpl(
+            TASK_ID_0,
+            mkSet(TP_0_0, TP_1_0),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
+            Optional.of(3L));

Review comment:
       nit:
   ```suggestion
       public static final TaskMetadata TM_0 = new TaskMetadataImpl(
           TASK_ID_0,
           mkSet(TP_0_0, TP_1_0),
           mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
           mkMap(mkEntry(TP_0_0, 1L), mkEntry(TP_1_0, 2L)),
           Optional.of(3L)
       );
   ```
   here and below.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
##########
@@ -55,6 +62,63 @@ public void shouldNotAllowModificationOfInternalStateViaGetters() {
         assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
     }
 
+    @Test
+    public void shouldFollowHashCodeAndEqualsContract() {

Review comment:
       see my other comment about separate tests.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName,
                               final Set<TaskMetadata> standbyTasks) {
         this.mainConsumerClientId = mainConsumerClientId;
         this.restoreConsumerClientId = restoreConsumerClientId;
-        this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+        if (producerClientIds != null) {
+            this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+        } else {
+            this.producerClientIds = Collections.emptySet();
+        }

Review comment:
       I could not reproduce this issue. the set of producer IDs should never be `null`. If this is the case the test is wrong.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
##########
@@ -18,30 +18,39 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collection;
+import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class StreamsMetadataTest {
 
     private static final HostInfo HOST_INFO = new HostInfo("local", 12);
+    public static final Set<String> STATE_STORE_NAMES = mkSet("store1", "store2");
     private static final TopicPartition TP_0 = new TopicPartition("t", 0);
     private static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
+    public static final Set<String> STAND_BY_STORE_NAMES = mkSet("store2");
+    public static final Set<TopicPartition> STANDBY_TOPIC_PARTITIONS = mkSet(TP_1);
 
     private StreamsMetadata streamsMetadata;
 
     @Before
     public void setUp() {
-        streamsMetadata = new StreamsMetadata(
-            HOST_INFO,
-            mkSet("store1", "store2"),
-            mkSet(TP_0, TP_1),
-            mkSet("store2"),
-            mkSet(TP_1)
+        streamsMetadata = new StreamsMetadataImpl(
+                HOST_INFO,
+                STATE_STORE_NAMES,
+                TOPIC_PARTITIONS,
+                STAND_BY_STORE_NAMES,
+                STANDBY_TOPIC_PARTITIONS

Review comment:
       nit: The indentation was actually correct before. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/ThreadMetadata.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of a single thread running within a {@link KafkaStreams} application.
+ */
+public interface ThreadMetadata {
+
+
+    /**
+     * @return the state of the Thread
+     */
+    String threadState();
+
+    /**
+     * @return the name of the Thread
+     */
+    String threadName();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current active tasks
+     * @return a set of metadata for the active tasks
+     */
+    Set<TaskMetadata> activeTasks();
+
+    /**
+     * This function will return the set of the {@link TaskMetadata} for the current standby tasks
+     * @return a set of metadata for the standby tasks
+     */
+    Set<TaskMetadata> standbyTasks();
+
+    /**
+     * @return the consumer Client Id
+     */
+    String consumerClientId();
+
+    /**
+     * @return the restore consumer Client Id
+     */
+    String restoreConsumerClientId();
+
+    /**
+     * This function will return the set of Client Ids for the producers
+     * @return set of producer Client Ids
+     */
+    Set<String> producerClientIds();
+
+    /**
+     * @return the admin Client Id
+     */
+    String adminClientId();
+
+    /**
+     * Compares the specified object with this ThreadMetadata. Returns {@code true} if and only if the specified object is
+     * also a TaskMetadata and both {@code threadName()} are equal, {@code threadState()} are equal, {@code activeTasks()} contain the same

Review comment:
       `TaskMetadata` -> `ThreadMetadata`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName,
                               final Set<TaskMetadata> standbyTasks) {
         this.mainConsumerClientId = mainConsumerClientId;
         this.restoreConsumerClientId = restoreConsumerClientId;
-        this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+        if (producerClientIds != null) {
+            this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+        } else {
+            this.producerClientIds = Collections.emptySet();
+        }

Review comment:
       OK, now I see. You meant `StreamThreadTest`. There you need to setup the mock for the `TaskManager` to return at least an empty set with `expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskMetadataImplTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+
+public class TaskMetadataImplTest {
+
+    public static final TaskId TASK_ID = new TaskId(1, 2);
+    public static final TopicPartition TP_0 = new TopicPartition("t", 0);
+    public static final TopicPartition TP_1 = new TopicPartition("t", 1);
+    public static final Set<TopicPartition> TOPIC_PARTITIONS = mkSet(TP_0, TP_1);
+    public static final Map<TopicPartition, Long> COMMITTED_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 2L));
+    public static final Map<TopicPartition, Long> END_OFFSETS = mkMap(mkEntry(TP_1, 1L), mkEntry(TP_1, 3L));
+    public static final Optional<Long> TIME_CURRENT_IDLING_STARTED = Optional.of(3L);
+
+    private TaskMetadata taskMetadata;
+
+    @Before
+    public void setUp() {
+        taskMetadata = new TaskMetadataImpl(
+                TASK_ID,
+                TOPIC_PARTITIONS,
+                COMMITTED_OFFSETS,
+                END_OFFSETS,
+                TIME_CURRENT_IDLING_STARTED);
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(taskMetadata.topicPartitions()));
+        assertTrue(isUnmodifiable(taskMetadata.committedOffsets()));
+        assertTrue(isUnmodifiable(taskMetadata.endOffsets()));

Review comment:
       In general, for new assertions we usually use
   ```suggestion
           assertThat(isUnmodifiable(taskMetadata.topicPartitions(), is(true)));
           assertThat(isUnmodifiable(taskMetadata.committedOffsets(), is(true)));
           assertThat(isUnmodifiable(taskMetadata.endOffsets()), is(true));
   ```
   This applies here and in other places of this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-856977741


   Failure was:
   ```
   [2021-06-08T14:01:59.439Z] FAILURE: Build failed with an exception.
   [2021-06-08T14:01:59.439Z] 
   [2021-06-08T14:01:59.439Z] * What went wrong:
   [2021-06-08T14:01:59.439Z] Execution failed for task ':core:integrationTest'.
   [2021-06-08T14:01:59.439Z] > Process 'Gradle Test Executor 127' finished with non-zero exit value 1
   [2021-06-08T14:01:59.439Z]   This problem might be caused by incorrect test process configuration.
   [2021-06-08T14:01:59.439Z]   Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.0.2/userguide/java_testing.html#sec:test_execution
   ```
   
   On top of the common `RaftClusterTest` ones


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657132576



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1478,8 +1499,36 @@ public void cleanUp() {
      * @param storeName the {@code storeName} to find metadata for
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
      * this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead
      */
-    public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @param storeName the {@code storeName} to find metadata for
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
+     * this application
+     */
+    public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {

Review comment:
       I decided to for for the pattern xxxForxxx to keep consistency among different changes.
   
   It is now `streamsMetadataForStore` but happy to change if anyone has reasons against it.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-867486897


   I see some usual suspects on test failures, but  `org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabled`is new. Looking at the stack trace, doesn't seem relevant to the changes made in this PR.
   
   ```
   java.lang.NullPointerException
   	at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
   	at org.reflections.Store.getAllIncluding(Store.java:82)
   	at org.reflections.Store.getAll(Store.java:93)
   	at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
   	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:352)
   	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:337)
   	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
   	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
   	at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
   	at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
   	at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:93)
   	at org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
   	at org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:174)
   	at org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.lambda$testSwitchingToTopicCreationEnabled$1(SourceConnectorsIntegrationTest.java:197)
   	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
   	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:559)
   	at org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabled(SourceConnectorsIntegrationTest.java:197)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
   	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
   	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
   	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
   	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
   	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
   	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
   	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
   	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
   	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
   	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
   	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
   	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
   	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:121)
   	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
   	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
   	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:414)
   	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
   	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   Is there some relation I'm missing, or should I create a new issue for this flaky test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r657134887



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ThreadMetadataImpl.java
##########
@@ -57,7 +57,11 @@ public ThreadMetadataImpl(final String threadName,
                               final Set<TaskMetadata> standbyTasks) {
         this.mainConsumerClientId = mainConsumerClientId;
         this.restoreConsumerClientId = restoreConsumerClientId;
-        this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+        if (producerClientIds != null) {
+            this.producerClientIds = Collections.unmodifiableSet(producerClientIds);
+        } else {
+            this.producerClientIds = Collections.emptySet();
+        }

Review comment:
       @cadonna, I needed to add this guard as ThreadMetadataTest was failing with NPE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#issuecomment-868685809


   The test failures are unrelated and known to be flaky:
   ```
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[1]  tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[3]  tlsProtocol=TLSv1.3, useInlinePem=false
   Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[1]  tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build  / JDK 11 and Scala 2.13 /  org.apache.kafka.common.network.SslTransportLayerTest.[3]  tlsProtocol=TLSv1.3, useInlinePem=false
   Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10840: KAFKA-12849: KIP-744 TaskMetadata ThreadMetadata StreamsMetadata as API

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10840:
URL: https://github.com/apache/kafka/pull/10840#discussion_r656694680



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskMetadataImpl.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.TaskMetadata;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+public class TaskMetadataImpl implements TaskMetadata {
+
+    private final TaskId taskId;
+
+    private final Set<TopicPartition> topicPartitions;
+
+    private final Map<TopicPartition, Long> committedOffsets;
+
+    private final Map<TopicPartition, Long> endOffsets;
+
+    private final Optional<Long> timeCurrentIdlingStarted;
+
+    public TaskMetadataImpl(final TaskId taskId,
+                            final Set<TopicPartition> topicPartitions,
+                            final Map<TopicPartition, Long> committedOffsets,
+                            final Map<TopicPartition, Long> endOffsets,
+                            final Optional<Long> timeCurrentIdlingStarted) {
+        this.taskId = taskId;
+        this.topicPartitions = topicPartitions;
+        this.committedOffsets = committedOffsets;
+        this.endOffsets = endOffsets;
+        this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> committedOffsets() {
+        return committedOffsets;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets() {
+        return endOffsets;
+    }
+
+    @Override
+    public Optional<Long> timeCurrentIdlingStarted() {
+        return timeCurrentIdlingStarted;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final org.apache.kafka.streams.processor.internals.TaskMetadataImpl that = (org.apache.kafka.streams.processor.internals.TaskMetadataImpl) o;

Review comment:
       Does this need to be fully qualified? There's only one `TaskMetadataImpl` class right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1478,8 +1499,36 @@ public void cleanUp() {
      * @param storeName the {@code storeName} to find metadata for
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
      * this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead
      */
-    public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that
+     * <ul>
+     *   <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
+     *       instances that belong to the same Kafka Streams application)</li>
+     *   <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
+     * </ul>
+     * and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @param storeName the {@code storeName} to find metadata for
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
+     * this application
+     */
+    public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {

Review comment:
       I don't feel too strongly about any of these names, but it's probably best to try and keep them (a) consistent with each other, where possible, and (b) consistent with the return type. Assuming we go with something like `allStreamsMetadata` for the new method above, maybe something like `streamsMetadataForStore` would be appropriate for this one?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##########
@@ -27,10 +27,12 @@
 
 /**
  * Represents the state of a single task running within a {@link KafkaStreams} application.
+ * @deprecated since 3.0, not intended for public use, use {@link org.apache.kafka.streams.TaskMetadata} instead.

Review comment:
       ```suggestion
    * @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead.
   ```
   I think people might be a bit confused if we say this, just point them to the new interface

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##########
@@ -229,7 +230,7 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePart
                                  final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
         if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
             allMetadata = Collections.emptyList();
-            localMetadata.set(new StreamsMetadata(thisHost,
+            localMetadata.set(new StreamsMetadataImpl(thisHost,
                                                   Collections.emptySet(),
                                                   Collections.emptySet(),
                                                   Collections.emptySet(),

Review comment:
       nit: fix alignment (here and below change)

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1458,8 +1457,30 @@ public void cleanUp() {
      * Note: this is a point in time view and it may change due to partition reassignment.
      *
      * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     * @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
      */
-    public Collection<StreamsMetadata> allMetadata() {
+    @Deprecated
+    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
+        validateIsRunningOrRebalancing();
+        return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
+                new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
+                        streamsMetadata.stateStoreNames(),
+                        streamsMetadata.topicPartitions(),
+                        streamsMetadata.standbyStateStoreNames(),
+                        streamsMetadata.standbyTopicPartitions()))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
+     * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
+     * <p>
+     * Note: this is a point in time view and it may change due to partition reassignment.
+     *
+     * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
+     */
+    public Collection<StreamsMetadata> allRunningMetadata() {

Review comment:
       How about `allStreamsMetadata`? I think that gets at the core difference here, which is that this returns metadata for all Streams instances.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamsMetadataImpl.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadataImpl implements StreamsMetadata {
+    /**
+     * Sentinel to indicate that the StreamsMetadata is currently unavailable. This can occur during rebalance
+     * operations.
+     */
+    public final static StreamsMetadataImpl NOT_AVAILABLE = new StreamsMetadataImpl(HostInfo.unavailable(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet(),
+                                                                            Collections.emptySet());
+

Review comment:
       nit: fix alignment

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1558,12 +1607,45 @@ private void processStreamThread(final Consumer<StreamThread> consumer) {
         for (final StreamThread thread : copy) consumer.accept(thread);
     }
 
+    /**
+     * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
+     *
+     * @return the set of {@link org.apache.kafka.streams.processor.ThreadMetadata}.
+     * @deprecated since 3.0 use {@link #threadsMetadata()}
+     */
+    @Deprecated
+    @SuppressWarnings("deprecation")
+    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
+        return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata(
+                threadMetadata.threadName(),
+                threadMetadata.threadState(),
+                threadMetadata.consumerClientId(),
+                threadMetadata.restoreConsumerClientId(),
+                threadMetadata.producerClientIds(),
+                threadMetadata.adminClientId(),
+                threadMetadata.activeTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet()),
+                threadMetadata.standbyTasks().stream().map(taskMetadata -> new org.apache.kafka.streams.processor.TaskMetadata(
+                        taskMetadata.taskId().toString(),
+                        taskMetadata.topicPartitions(),
+                        taskMetadata.committedOffsets(),
+                        taskMetadata.endOffsets(),
+                        taskMetadata.timeCurrentIdlingStarted())
+                ).collect(Collectors.toSet())))
+                .collect(Collectors.toSet());
+    }
+
     /**
      * Returns runtime information about the local threads of this {@link KafkaStreams} instance.
      *
      * @return the set of {@link ThreadMetadata}.
      */
-    public Set<ThreadMetadata> localThreadsMetadata() {
+    public Set<ThreadMetadata> threadsMetadata() {

Review comment:
       I think `localThreadMetadata` sounds better to me (actually even better than the original `localThreadsMetadata` tbh), but I'm fine with `threadsMetadata` too 

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsMetadata.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.state.HostInfo;
+
+import java.util.Set;
+
+public interface StreamsMetadata {
+
+    /**
+     * The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
+     * instance, which is typically host/port
+     *
+     * @return {@link HostInfo} corresponding to the streams instance
+     */
+    HostInfo hostInfo();
+
+    /**
+     * State stores owned by the instance as an active replica
+     *
+     * @return set of active state store names
+     */
+    Set<String> stateStoreNames();
+
+    /**
+     * Topic partitions consumed by the instance as an active replica
+     *
+     * @return set of active topic partitions
+     */
+    Set<TopicPartition> topicPartitions();
+
+    /**
+     * (Source) Topic partitions for which the instance acts as standby.
+     *
+     * @return set of standby topic partitions
+     */
+    Set<TopicPartition> standbyTopicPartitions();
+
+    /**
+     * State stores owned by the instance as a standby replica
+     *
+     * @return set of standby state store names
+     */
+    Set<String> standbyStateStoreNames();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
+     */
+    String host();
+
+    /**
+     * This method is equivalent to call {@code StreamsMetadata.hostInfo().port();}
+     */
+    int port();
+
+    /**
+     * Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
+     * also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
+     * {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
+     */
+    boolean equals(Object o);
+
+    /**
+     * Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
+     * <pre>
+     * {@code
+     * Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
+     * }

Review comment:
       missing closing tag `</pre>`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org