You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/10/19 04:43:12 UTC
[kafka] branch trunk updated: KAFKA-15616: Client telemetry states and transition (KIP-714) (#14566)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 26aa353dc1c KAFKA-15616: Client telemetry states and transition (KIP-714) (#14566)
26aa353dc1c is described below
commit 26aa353dc1c34f932e6d05d5b270a7f3690c36cf
Author: Apoorv Mittal <ap...@gmail.com>
AuthorDate: Thu Oct 19 05:43:05 2023 +0100
KAFKA-15616: Client telemetry states and transition (KIP-714) (#14566)
Part of KIP-714.
Reviewers: Andrew Schofield <as...@confluent.io>, Philip Nee <pn...@confluent.io>, Kirk True <kt...@confluent.io>, Walker Carlson <wc...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../common/telemetry/ClientTelemetryState.java | 166 +++++++++++++++++++++
.../common/telemetry/ClientTelemetryStateTest.java | 111 ++++++++++++++
2 files changed, 277 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
new file mode 100644
index 00000000000..7dbb2703947
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * State that helps determine where client exists in the telemetry state i.e. subscribe->wait->push loop.
+ */
+public enum ClientTelemetryState {
+
+ /**
+ * Client needs subscription from the broker.
+ */
+ SUBSCRIPTION_NEEDED,
+
+ /**
+ * Network I/O is in progress to retrieve subscription.
+ */
+ SUBSCRIPTION_IN_PROGRESS,
+
+ /**
+ * Awaiting telemetry interval for pushing metrics to broker.
+ */
+ PUSH_NEEDED,
+
+ /**
+ * Network I/O in progress for pushing metrics payload.
+ */
+ PUSH_IN_PROGRESS,
+
+ /**
+ * Need to push the terminal metrics payload.
+ */
+ TERMINATING_PUSH_NEEDED,
+
+ /**
+ * Network I/O in progress for pushing terminal metrics payload.
+ */
+ TERMINATING_PUSH_IN_PROGRESS,
+
+ /**
+ * No more work should be performed, telemetry client terminated.
+ */
+ TERMINATED;
+
+ private final static Map<ClientTelemetryState, List<ClientTelemetryState>> VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class);
+
+ static {
+ /*
+ If clients needs a subscription, then issue telemetry API to fetch subscription from broker.
+
+ However, it's still possible that client doesn't get very far before terminating.
+ */
+ VALID_NEXT_STATES.put(
+ SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS, TERMINATED));
+
+ /*
+ If client is finished waiting for subscription, then client is ready to push the telemetry.
+ But, it's possible that no telemetry metrics are requested, hence client should go back to
+ subscription needed state i.e. requesting the next updated subscription.
+
+ However, it's still possible that client doesn't get very far before terminating.
+ */
+ VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS, Arrays.asList(PUSH_NEEDED,
+ SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED));
+
+ /*
+ If client transitions out of this state, then client should proceed to push the metrics.
+ But, if the push fails (network issues, the subscription changed, etc.) then client should
+ go back to subscription needed state and request the next subscription.
+
+ However, it's still possible that client doesn't get very far before terminating.
+ */
+ VALID_NEXT_STATES.put(PUSH_NEEDED, Arrays.asList(PUSH_IN_PROGRESS, SUBSCRIPTION_NEEDED,
+ TERMINATING_PUSH_NEEDED, TERMINATED));
+
+ /*
+ A successful push should transition client to push needed which sends the next telemetry
+ metrics after the elapsed wait interval. But, if the push fails (network issues, the
+ subscription changed, etc.) then client should go back to subscription needed state and
+ request the next subscription.
+
+ However, it's still possible that client doesn't get very far before terminating.
+ */
+ VALID_NEXT_STATES.put(
+ PUSH_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED,
+ TERMINATED));
+
+ /*
+ If client is moving out of this state, then try to send last metrics push.
+
+ However, it's still possible that client doesn't get very far before terminating.
+ */
+ VALID_NEXT_STATES.put(
+ TERMINATING_PUSH_NEEDED, Arrays.asList(TERMINATING_PUSH_IN_PROGRESS, TERMINATED));
+
+ /*
+ Client should only be transited to terminated state.
+ */
+ VALID_NEXT_STATES.put(TERMINATING_PUSH_IN_PROGRESS, Collections.singletonList(TERMINATED));
+
+ /*
+ Client should never be able to transition out of terminated state.
+ */
+ VALID_NEXT_STATES.put(TERMINATED, Collections.emptyList());
+ }
+
+ /**
+ * Validates that the <code>newState</code> is one of the valid transition from the current
+ * {@code TelemetryState}.
+ *
+ * @param newState State into which the telemetry client requesting to transition; must be
+ * non-<code>null</code>
+ * @return {@code TelemetryState} <code>newState</code> if validation succeeds. Returning
+ * <code>newState</code> helps state assignment chaining.
+ * @throws IllegalStateException if the state transition validation fails.
+ */
+
+ public ClientTelemetryState validateTransition(ClientTelemetryState newState) {
+ List<ClientTelemetryState> allowableStates = VALID_NEXT_STATES.get(this);
+
+ if (allowableStates != null && allowableStates.contains(newState)) {
+ return newState;
+ }
+
+ // State transition validation failed, construct error message and throw exception.
+ String validStatesClause;
+ if (allowableStates != null && !allowableStates.isEmpty()) {
+ validStatesClause = String.format("the valid telemetry state transitions from %s are: %s",
+ this,
+ Utils.join(allowableStates, ", "));
+ } else {
+ validStatesClause = String.format("there are no valid telemetry state transitions from %s", this);
+ }
+
+ String message = String.format("Invalid telemetry state transition from %s to %s; %s",
+ this,
+ newState,
+ validStatesClause);
+
+ throw new IllegalStateException(message);
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java b/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java
new file mode 100644
index 00000000000..9e6831032b4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/telemetry/ClientTelemetryStateTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ClientTelemetryStateTest {
+
+ @Test
+ public void testValidateTransitionForSubscriptionNeeded() {
+ List<ClientTelemetryState> validStates = new ArrayList<>();
+ validStates.add(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS);
+ validStates.add(ClientTelemetryState.TERMINATED);
+
+ testValidateTransition(ClientTelemetryState.SUBSCRIPTION_NEEDED, validStates);
+ }
+
+ @Test
+ public void testValidateTransitionForSubscriptionInProgress() {
+ List<ClientTelemetryState> validStates = new ArrayList<>();
+ validStates.add(ClientTelemetryState.PUSH_NEEDED);
+ validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED);
+ validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED);
+ validStates.add(ClientTelemetryState.TERMINATED);
+
+ testValidateTransition(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, validStates);
+ }
+
+ @Test
+ public void testValidateTransitionForPushNeeded() {
+ List<ClientTelemetryState> validStates = new ArrayList<>();
+ validStates.add(ClientTelemetryState.PUSH_IN_PROGRESS);
+ validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED);
+ validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED);
+ validStates.add(ClientTelemetryState.TERMINATED);
+
+ testValidateTransition(ClientTelemetryState.PUSH_NEEDED, validStates);
+ }
+
+ @Test
+ public void testValidateTransitionForPushInProgress() {
+ List<ClientTelemetryState> validStates = new ArrayList<>();
+ validStates.add(ClientTelemetryState.PUSH_NEEDED);
+ validStates.add(ClientTelemetryState.SUBSCRIPTION_NEEDED);
+ validStates.add(ClientTelemetryState.TERMINATING_PUSH_NEEDED);
+ validStates.add(ClientTelemetryState.TERMINATED);
+
+ testValidateTransition(ClientTelemetryState.PUSH_IN_PROGRESS, validStates);
+ }
+
+ @Test
+ public void testValidateTransitionForTerminating() {
+ List<ClientTelemetryState> validStates = new ArrayList<>();
+ validStates.add(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS);
+ validStates.add(ClientTelemetryState.TERMINATED);
+
+ testValidateTransition(ClientTelemetryState.TERMINATING_PUSH_NEEDED, validStates);
+ }
+
+ @Test
+ public void testValidateTransitionForTerminatingPushInProgress() {
+ testValidateTransition(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS,
+ Collections.singletonList(ClientTelemetryState.TERMINATED));
+ }
+
+ @Test
+ public void testValidateTransitionForTerminated() {
+ // There's no transitioning out of the terminated state
+ testValidateTransition(ClientTelemetryState.TERMINATED, Collections.emptyList());
+ }
+
+ private void testValidateTransition(ClientTelemetryState oldState, List<ClientTelemetryState> validStates) {
+ for (ClientTelemetryState newState : validStates) {
+ oldState.validateTransition(newState);
+ }
+
+ // Copy value to a new list for modification.
+ List<ClientTelemetryState> invalidStates = new ArrayList<>(Arrays.asList(ClientTelemetryState.values()));
+ // Remove the valid states from the list of all states, leaving only the invalid.
+ invalidStates.removeAll(validStates);
+
+ for (ClientTelemetryState newState : invalidStates) {
+ Executable e = () -> oldState.validateTransition(newState);
+ String unexpectedSuccessMessage = "Should have thrown an IllegalTelemetryStateException for transitioning from " + oldState + " to " + newState;
+ assertThrows(IllegalStateException.class, e, unexpectedSuccessMessage);
+ }
+ }
+
+}