You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/10/19 04:43:12 UTC

[kafka] branch trunk updated: KAFKA-15616: Client telemetry states and transition (KIP-714) (#14566)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 26aa353dc1c KAFKA-15616: Client telemetry states and transition (KIP-714) (#14566)
26aa353dc1c is described below

commit 26aa353dc1c34f932e6d05d5b270a7f3690c36cf
Author: Apoorv Mittal <ap...@gmail.com>
AuthorDate: Thu Oct 19 05:43:05 2023 +0100

    KAFKA-15616: Client telemetry states and transition (KIP-714) (#14566)
    
    Part of KIP-714.
    
    Reviewers: Andrew Schofield <as...@confluent.io>, Philip Nee <pn...@confluent.io>, Kirk True <kt...@confluent.io>, Walker Carlson <wc...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../common/telemetry/ClientTelemetryState.java     | 166 +++++++++++++++++++++
 .../common/telemetry/ClientTelemetryStateTest.java | 111 ++++++++++++++
 2 files changed, 277 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
new file mode 100644
index 00000000000..7dbb2703947
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop.
+ */
+public enum ClientTelemetryState {
+
+    /**
+     * Client needs subscription from the broker.
+     */
+    SUBSCRIPTION_NEEDED,
+
+    /**
+     * Network I/O is in progress to retrieve subscription.
+     */
+    SUBSCRIPTION_IN_PROGRESS,
+
+    /**
+     * Awaiting telemetry interval for pushing metrics to broker.
+     */
+    PUSH_NEEDED,
+
+    /**
+     * Network I/O in progress for pushing metrics payload.
+     */
+    PUSH_IN_PROGRESS,
+
+    /**
+     * Need to push the terminal metrics payload.
+     */
+    TERMINATING_PUSH_NEEDED,
+
+    /**
+     * Network I/O in progress for pushing terminal metrics payload.
+     */
+    TERMINATING_PUSH_IN_PROGRESS,
+
+    /**
+     * No more work should be performed, telemetry client terminated.
+     */
+    TERMINATED;
+
+    private final static Map<ClientTelemetryState, List<ClientTelemetryState>> VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class);
+
+    static {
+        /*
+         If clients needs a subscription, then issue telemetry API to fetch subscription from broker.
+
+         However, it's still possible that client doesn't get very far before terminating.
+        */
+        VALID_NEXT_STATES.put(
+            SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS, TERMINATED));
+
+        /*
+         If client is finished waiting for subscription, then client is ready to push the telemetry.
+         But, it's possible that no telemetry metrics are requested, hence client should go back to
+         subscription needed state i.e. requesting the next updated subscription.
+
+         However, it's still possible that client doesn't get very far before terminating.
+        */
+        VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS, Arrays.asList(PUSH_NEEDED,
+            SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED));
+
+        /*
+         If client transitions out of this state, then client should proceed to push the metrics.
+         But, if the push fails (network issues, the subscription changed, etc.) then client should
+         go back to subscription needed state and request the next subscription.
+
+         However, it's still possible that client doesn't get very far before terminating.
+        */
+        VALID_NEXT_STATES.put(PUSH_NEEDED, Arrays.asList(PUSH_IN_PROGRESS, SUBSCRIPTION_NEEDED,
+            TERMINATING_PUSH_NEEDED, TERMINATED));
+
+        /*
+         A successful push should transition client to push needed which sends the next telemetry
+         metrics after the elapsed wait interval. But, if the push fails (network issues, the
+         subscription changed, etc.) then client should go back to subscription needed state and
+         request the next subscription.
+
+         However, it's still possible that client doesn't get very far before terminating.
+        */
+        VALID_NEXT_STATES.put(
+            PUSH_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED,
+                TERMINATED));
+
+        /*
+         If client is moving out of this state, then try to send last metrics push.
+
+         However, it's still possible that client doesn't get very far before terminating.
+        */
+        VALID_NEXT_STATES.put(
+            TERMINATING_PUSH_NEEDED, Arrays.asList(TERMINATING_PUSH_IN_PROGRESS, TERMINATED));
+
+        /*
+         Client should only be transited to terminated state.
+        */
+        VALID_NEXT_STATES.put(TERMINATING_PUSH_IN_PROGRESS, Collections.singletonList(TERMINATED));
+
+        /*
+         Client should never be able to transition out of terminated state.
+        */
+        VALID_NEXT_STATES.put(TERMINATED, Collections.emptyList());
+    }
+
+    /**
+     * Validates that the <code>newState</code> is one of the valid transition from the current
+     * {@code TelemetryState}.
+     *
+     * @param newState State into which the telemetry client requesting to transition; must be
+     *                 non-<code>null</code>
+     * @return {@code TelemetryState} <code>newState</code> if validation succeeds. Returning
+     * <code>newState</code> helps state assignment chaining.
+     * @throws IllegalStateException if the state transition validation fails.
+     */
+
+    public ClientTelemetryState validateTransition(ClientTelemetryState newState) {
+        List<ClientTelemetryState> allowableStates = VALID_NEXT_STATES.get(this);
+
+        if (allowableStates != null && allowableStates.contains(newState)) {
+            return newState;
+        }
+
+        // State transition validation failed, construct error message and throw exception.
+        String validStatesClause;
+        if (allowableStates != null && !allowableStates.isEmpty()) {
+            validStatesClause = String.format("the valid telemetry state transitions from %s are: %s",
+                this,
+                Utils.join(allowableStates, ", "));
+        } else {
+            validStatesClause = String.format("there are no valid telemetry state transitions from %s", this);
+        }
+
+        String message = String.format("Invalid telemetry state transition from %s to %s; %s",
+            this,
+            newState,
+            validStatesClause);
+
+        throw new IllegalStateException(message);
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java
new file mode 100644
index 00000000000..9e6831032b4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ClientTelemetryStateTest {
+
+    @Test
+    public void testValidateTransitionForSubscriptionNeeded() {
+        List<ClientTelemetryState> validStates = new ArrayList<>();
+        validStates.add(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS);
+        validStates.add(ClientTelemetryState.TERMINATED);
+
+        testValidateTransition(ClientTelemetryState.SUBSCRIPTION_NEEDED, validStates);
+    }
+
+    @Test
+    public void testValidateTransitionForSubscriptionInProgress() {
+        List<ClientTelemetryState> validStates = new ArrayList<>();
+        validStates.add(ClientTelemetryState.PUSH_NEEDED);
+        validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED);
+        validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED);
+        validStates.add(ClientTelemetryState.TERMINATED);
+
+        testValidateTransition(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, validStates);
+    }
+
+    @Test
+    public void testValidateTransitionForPushNeeded() {
+        List<ClientTelemetryState> validStates = new ArrayList<>();
+        validStates.add(ClientTelemetryState.PUSH_IN_PROGRESS);
+        validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED);
+        validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED);
+        validStates.add(ClientTelemetryState.TERMINATED);
+
+        testValidateTransition(ClientTelemetryState.PUSH_NEEDED, validStates);
+    }
+
+    @Test
+    public void testValidateTransitionForPushInProgress() {
+        List<ClientTelemetryState> validStates = new ArrayList<>();
+        validStates.add(ClientTelemetryState.PUSH_NEEDED);
+        validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED);
+        validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED);
+        validStates.add(ClientTelemetryState.TERMINATED);
+
+        testValidateTransition(ClientTelemetryState.PUSH_IN_PROGRESS, validStates);
+    }
+
+    @Test
+    public void testValidateTransitionForTerminating() {
+        List<ClientTelemetryState> validStates = new ArrayList<>();
+        validStates.add(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS);
+        validStates.add(ClientTelemetryState.TERMINATED);
+
+        testValidateTransition(ClientTelemetryState.TERMINATING_PUSH_NEEDED, validStates);
+    }
+
+    @Test
+    public void testValidateTransitionForTerminatingPushInProgress() {
+        testValidateTransition(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS,
+            Collections.singletonList(ClientTelemetryState.TERMINATED));
+    }
+
+    @Test
+    public void testValidateTransitionForTerminated() {
+        // There's no transitioning out of the terminated state
+        testValidateTransition(ClientTelemetryState.TERMINATED, Collections.emptyList());
+    }
+
+    private void testValidateTransition(ClientTelemetryState oldState, List<ClientTelemetryState> validStates) {
+        for (ClientTelemetryState newState : validStates) {
+            oldState.validateTransition(newState);
+        }
+
+        // Copy value to a new list for modification.
+        List<ClientTelemetryState> invalidStates = new ArrayList<>(Arrays.asList(ClientTelemetryState.values()));
+        // Remove the valid states from the list of all states, leaving only the invalid.
+        invalidStates.removeAll(validStates);
+
+        for (ClientTelemetryState newState : invalidStates) {
+            Executable e = () -> oldState.validateTransition(newState);
+            String unexpectedSuccessMessage = "Should have thrown an IllegalTelemetryStateException for transitioning from " + oldState + " to " + newState;
+            assertThrows(IllegalStateException.class, e, unexpectedSuccessMessage);
+        }
+    }
+
+}