You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/18 18:12:42 UTC

[GitHub] [kafka] lct45 opened a new pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

lct45 opened a new pull request #9614:
URL: https://github.com/apache/kafka/pull/9614


   Per [KIP-663](https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads), adding a metric to record the failed streams threads over the life of a client.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#issuecomment-729864151


   @wcarlson5 @cadonna for initial review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r527117685



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -221,6 +221,9 @@ State setState(final State newState) {
                 throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
             } else {
                 log.info("State transition from {} to {}", oldState, newState);
+                if (newState == State.DEAD) {
+                    failedStreamThreadSensor.record();
+                }

Review comment:
       Would that just be in `run()` of the GlobalStreamThread then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533586582



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -619,8 +647,7 @@ public void shouldProvideCorrectStrings() {
     }
 
     private void setupRemoveSensorsTest(final Metrics metrics,
-                                        final String level,
-                                        final RecordingLevel recordingLevel) {

Review comment:
       This wasn't being used so I went ahead and took it out 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r529313022



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        final String key = CLIENT_LEVEL_GROUP;
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+            return Optional.ofNullable(metrics.getSensor(fullSensorName))
+                .orElseGet(() -> {
+                    clientLevelSensors.push(fullSensorName);
+                    return metrics.sensor(fullSensorName, recordingLevel, parents);
+                });

Review comment:
       Although, we use this in other methods, I think the following is a bit simpler to read:
   ```suggestion
           final Sensor sensor = metrics.getSensor(fullSensorName);
           if (sensor == null) {
               clientLevelSensors.push(fullSensorName);
               return metrics.sensor(fullSensorName, recordingLevel, parents);
           }
           return sensor;
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        final String key = CLIENT_LEVEL_GROUP;
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;

Review comment:
       You can inline the value of variable `key` here and remove `key`.
   
   ```suggestion
               final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -577,6 +579,38 @@ public void shouldGetExistingCacheLevelSensor() {
         assertThat(actualSensor, is(equalToObject(sensor)));
     }
 
+    @Test
+    public void shouldGetNewClientLevelSensor() {
+        final Metrics metrics = mock(Metrics.class);
+        final RecordingLevel recordingLevel = RecordingLevel.INFO;
+        setupGetNewSensorTest(metrics, recordingLevel);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+
+        final Sensor actualSensor = streamsMetrics.clientLevelSensor(
+            SENSOR_NAME_1,
+            recordingLevel
+        );

Review comment:
       ```suggestion
           final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -2438,6 +2440,60 @@ public void shouldConstructAdminMetrics() {
         assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName());
     }
 
+    @Test
+    public void shouldCountFailedStreamThread() {
+        verifyFailedStreamThread(false);
+        verifyFailedStreamThread(true);
+    }
+
+    public void verifyFailedStreamThread(final boolean shouldFail) {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            e -> { }
+        ) {
+            @Override
+            void runOnce() {
+                setState(StreamThread.State.PENDING_SHUTDOWN);
+                if (shouldFail) {
+                    throw new StreamsException(Thread.currentThread().getName());
+                }
+            }
+        };
+        expect(taskManager.activeTaskMap()).andReturn(Collections.emptyMap());
+        expect(taskManager.standbyTaskMap()).andReturn(Collections.emptyMap());
+
+        taskManager.process(anyInt(), anyObject());
+        EasyMock.expectLastCall().andThrow(new StreamsException(Thread.currentThread().getName()));
+

Review comment:
       You can remove these lines. They are dead code.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -2438,6 +2440,60 @@ public void shouldConstructAdminMetrics() {
         assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName());
     }
 
+    @Test
+    public void shouldCountFailedStreamThread() {
+        verifyFailedStreamThread(false);
+        verifyFailedStreamThread(true);
+    }
+
+    public void verifyFailedStreamThread(final boolean shouldFail) {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            e -> { }
+        ) {
+            @Override
+            void runOnce() {
+                setState(StreamThread.State.PENDING_SHUTDOWN);
+                if (shouldFail) {
+                    throw new StreamsException(Thread.currentThread().getName());
+                }
+            }
+        };
+        expect(taskManager.activeTaskMap()).andReturn(Collections.emptyMap());
+        expect(taskManager.standbyTaskMap()).andReturn(Collections.emptyMap());
+
+        taskManager.process(anyInt(), anyObject());
+        EasyMock.expectLastCall().andThrow(new StreamsException(Thread.currentThread().getName()));
+
+        EasyMock.replay(taskManager);
+
+        thread.updateThreadMetadata("metadata");
+        thread.setState(StreamThread.State.STARTING);
+        thread.runLoop();
+
+        final Metric failedThreads = StreamsTestUtils.getMetricByName(metrics.metrics(), "failed-stream-threads", "stream-metrics");
+
+        assertEquals(shouldFail ? 1.0 : 0.0, failedThreads.metricValue());

Review comment:
       ```suggestion
           EasyMock.replay(taskManager);
           thread.updateThreadMetadata("metadata");
           thread.setState(StreamThread.State.STARTING);
           
           thread.runLoop();
   
           final Metric failedThreads = StreamsTestUtils.getMetricByName(metrics.metrics(), "failed-stream-threads", "stream-metrics");
           assertThat(failedThreads.metricValue(), is(shouldFail ? 1.0 : 0.0));
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -2438,6 +2440,60 @@ public void shouldConstructAdminMetrics() {
         assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName());
     }
 
+    @Test
+    public void shouldCountFailedStreamThread() {
+        verifyFailedStreamThread(false);
+        verifyFailedStreamThread(true);
+    }
+
+    public void verifyFailedStreamThread(final boolean shouldFail) {

Review comment:
       I would rename this method to `runAndVerifyFailedStreamThreadRecording()`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -577,6 +579,38 @@ public void shouldGetExistingCacheLevelSensor() {
         assertThat(actualSensor, is(equalToObject(sensor)));
     }
 
+    @Test
+    public void shouldGetNewClientLevelSensor() {
+        final Metrics metrics = mock(Metrics.class);
+        final RecordingLevel recordingLevel = RecordingLevel.INFO;
+        setupGetNewSensorTest(metrics, recordingLevel);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+
+        final Sensor actualSensor = streamsMetrics.clientLevelSensor(
+            SENSOR_NAME_1,
+            recordingLevel
+        );
+
+        verify(metrics);
+        assertThat(actualSensor, is(equalToObject(sensor)));
+    }
+
+    @Test
+    public void shouldGetExistingClientLevelSensor() {
+        final Metrics metrics = mock(Metrics.class);
+        final RecordingLevel recordingLevel = RecordingLevel.INFO;
+        setupGetExistingSensorTest(metrics);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
+
+        final Sensor actualSensor = streamsMetrics.clientLevelSensor(
+            SENSOR_NAME_1,
+            recordingLevel
+        );

Review comment:
       ```suggestion
           final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -2438,6 +2440,60 @@ public void shouldConstructAdminMetrics() {
         assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName());
     }
 
+    @Test
+    public void shouldCountFailedStreamThread() {
+        verifyFailedStreamThread(false);
+        verifyFailedStreamThread(true);
+    }

Review comment:
       Could you please specify two separate unit tests? One could be named `shouldRecordFailedStreamThread()` and the other `shouldNotRecordFailedStreamThread()`.

##########
File path: streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##########
@@ -99,6 +121,29 @@ public void shouldAddAliveStreamThreadsMetric() {
         );
     }
 
+    @Test
+    public void shouldGetFailedStreamThreadsSensor() {
+        final String name = "failed-stream-threads";
+        final String description = "The number of failed stream threads since the start of the Kafka Streams client";
+        expect(streamsMetrics.clientLevelSensor(name, RecordingLevel.INFO)).andReturn(expectedSensor);
+        expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap);
+        StreamsMetricsImpl.addSumMetricToSensor(
+            expectedSensor,
+            CLIENT_LEVEL_GROUP,
+            tagMap,
+            name,
+            false,
+            description
+        );
+

Review comment:
       Please remove empty line.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -629,17 +663,20 @@ private void setupRemoveSensorsTest(final Metrics metrics,
     }
 
     @Test
-    public void shouldRemoveClientLevelMetrics() {
+    public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = niceMock(Metrics.class);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
-        addSensorsOnAllLevels(metrics, streamsMetrics);
+        final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
         resetToDefault(metrics);
-        expect(metrics.removeMetric(metricName1)).andStubReturn(null);
-        expect(metrics.removeMetric(metricName2)).andStubReturn(null);
-        replay(metrics);
 

Review comment:
       Please remove this empty line.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -629,17 +663,20 @@ private void setupRemoveSensorsTest(final Metrics metrics,
     }
 
     @Test
-    public void shouldRemoveClientLevelMetrics() {
+    public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = niceMock(Metrics.class);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
-        addSensorsOnAllLevels(metrics, streamsMetrics);
+        final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
         resetToDefault(metrics);
-        expect(metrics.removeMetric(metricName1)).andStubReturn(null);
-        expect(metrics.removeMetric(metricName2)).andStubReturn(null);
-        replay(metrics);
 
-        streamsMetrics.removeAllClientLevelMetrics();
+        metrics.removeSensor(sensorKeys.getValues().get(0));
+        metrics.removeSensor(sensorKeys.getValues().get(1));
+
+        expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
+        expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));
+        replay(metrics);
 
+        streamsMetrics.removeAllClientLevelSensorsAndMetrics();
         verify(metrics);

Review comment:
       ```suggestion
           metrics.removeSensor(sensorKeys.getValues().get(0));
           metrics.removeSensor(sensorKeys.getValues().get(1));
           expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
           expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));
           replay(metrics);
   
           streamsMetrics.removeAllClientLevelSensorsAndMetrics();
   
           verify(metrics);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r526352796



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##########
@@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) thr
             builtInMetricsVersion
         );
         checkCacheMetrics(builtInMetricsVersion);
-
+        verifyFailedStreamThreadsSensor(0.0);

Review comment:
       I think you should try using the custom processor. You can find an example in `StreamsUncaughtExceptionHandlerIntegrationTest.java`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533639922



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
+            final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
       I am fine either way, too, but I prefer consistency... So should we rewrite the other method as a side cleanup?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533570535



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
+            final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
       I'm good either way (: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r534256605



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
+            final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
       I changed it back for consistency and will open up a fix PR to update both of them to the new syntax




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533020449



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
+            final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
       Should we rewrite this the same way `threadLevelSensor` is written (ie, using `orElseGet`) for consistency?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -629,16 +657,18 @@ private void setupRemoveSensorsTest(final Metrics metrics,
     }
 
     @Test
-    public void shouldRemoveClientLevelMetrics() {
+    public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = niceMock(Metrics.class);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
-        addSensorsOnAllLevels(metrics, streamsMetrics);
+        final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
         resetToDefault(metrics);
-        expect(metrics.removeMetric(metricName1)).andStubReturn(null);
-        expect(metrics.removeMetric(metricName2)).andStubReturn(null);
-        replay(metrics);
 
-        streamsMetrics.removeAllClientLevelMetrics();
+        metrics.removeSensor(sensorKeys.getValues().get(0));
+        metrics.removeSensor(sensorKeys.getValues().get(1));
+        expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
+        expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));

Review comment:
       Why did we change this from `andStubReturn(null)` to `andReturn(mock(KafkaMetric.class))`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
##########
@@ -125,4 +131,16 @@ public static void addNumAliveStreamThreadMetric(final StreamsMetricsImpl stream
             stateProvider
         );
     }
+    public static Sensor failedStreamThreadSensor(final StreamsMetricsImpl streamsMetrics) {

Review comment:
       nit: missing empty line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533641540



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
+            final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
       I am fine with consistency and clean-up, but I would like to have the clean-up in a separate PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #9614:
URL: https://github.com/apache/kafka/pull/9614


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r527724338



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##########
@@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) thr
             builtInMetricsVersion
         );
         checkCacheMetrics(builtInMetricsVersion);
-
+        verifyFailedStreamThreadsSensor(0.0);

Review comment:
       After looking at both test classes, I think it actually might make the most sense to put the test for this metric in `StreamsUncaughtExceptionHandlerIntegrationTest`, since the metric is so closely aligned with the exception handler anyways and the setup works nicely with what we're trying to test with the metric. From the size + complexity of the other test classes, I think creating an overloaded processor for one test out of 20+ tests seems tricky.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r526355838



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1070,6 +1070,10 @@ private Thread shutdownHelper(final boolean error) {
             adminClient.close();
 
             streamsMetrics.removeAllClientLevelMetrics();

Review comment:
       Ahh, got mixed up when I rebased trunk




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r526347359



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1070,6 +1070,10 @@ private Thread shutdownHelper(final boolean error) {
             adminClient.close();
 
             streamsMetrics.removeAllClientLevelMetrics();

Review comment:
       This seems wrong. There are a few duplicate calls here

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -219,6 +220,8 @@ State setState(final State newState) {
             } else if (!state.isValidTransition(newState)) {
                 log.error("Unexpected state transition from {} to {}", oldState, newState);
                 throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
+            } else if (newState == State.DEAD) {
+                failedStreamThreadSensor.record();
             } else {
                 log.info("State transition from {} to {}", oldState, newState);

Review comment:
       We probably don't want to skip this log. Can you move the sensor in here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r527122772



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -221,6 +221,9 @@ State setState(final State newState) {
                 throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
             } else {
                 log.info("State transition from {} to {}", oldState, newState);
+                if (newState == State.DEAD) {
+                    failedStreamThreadSensor.record();
+                }

Review comment:
       No, that would be in `StreamThread#runLoop()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r526880412



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1070,7 +1070,9 @@ private Thread shutdownHelper(final boolean error) {
             adminClient.close();
 
             streamsMetrics.removeAllClientLevelMetrics();
+            streamsMetrics.removeAllClientLevelSensors();

Review comment:
       Could you please add a public method to `StreamsMetricsImpl` named `removeAllClientLevelSensorsAndMetrics()` that calls `removeAllClientLevelMetrics()` and `removeAllClientLevelSensors()` and make the latter two methods `private`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
##########
@@ -60,6 +65,7 @@ private ClientMetrics() {}
         "The description of the topology executed in the Kafka Streams client";
     private static final String STATE_DESCRIPTION = "The state of the Kafka Streams client";
     private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The current number of alive stream threads that are running or participating in rebalance";
+    private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads so far for a given Kafka Streams client";

Review comment:
       ```suggestion
       private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads since the start of the Kafka Streams client";
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -93,6 +93,7 @@ public int hashCode() {
 
     private final Version version;
     private final Deque<MetricName> clientLevelMetrics = new LinkedList<>();
+    private final Map<String, Deque<String>> clientLevelSensors = new HashMap<>();

Review comment:
       Here you should just need a queue as for `clientLevelMetrics`. We need a map for the other levels because there can be multiple objects for each level, e.g., there might be multiple stream thread and each one manages its sensors under a key in the map. However, there is only one client on client level.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -253,6 +268,16 @@ public final void removeAllClientLevelMetrics() {
         }
     }
 
+    public final void removeAllClientLevelSensors() {

Review comment:
       Unit tests for this method are missing. Please also consider my comment in class `KafkaStreams` for these unit tests. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -221,6 +221,9 @@ State setState(final State newState) {
                 throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
             } else {
                 log.info("State transition from {} to {}", oldState, newState);
+                if (newState == State.DEAD) {
+                    failedStreamThreadSensor.record();
+                }

Review comment:
       Not every dead stream thread is a failed stream thread. You should record this metric where the uncaught exception handler is called because there we now that a stream thread died unexpectedly.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##########
@@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) thr
             builtInMetricsVersion
         );
         checkCacheMetrics(builtInMetricsVersion);
-
+        verifyFailedStreamThreadsSensor(0.0);

Review comment:
       I would put the test whether the metric is recorded correctly in `StreamThreadTest`. An example for such a test is `shouldLogAndRecordSkippedRecordsForInvalidTimestamps()`. I do not think an integration test is needed. The test regarding the existence of the metric, i.e., `checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);` should stay here.

##########
File path: streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##########
@@ -99,6 +121,27 @@ public void shouldAddAliveStreamThreadsMetric() {
         );
     }
 
+    @Test
+    public void shouldGetFailedStreamThreadsSensor() {
+        final String name = "failed-stream-threads";
+        final String description = "The number of failed stream threads so far for a given Kafka Streams client";
+        expect(streamsMetrics.clientLevelSensor(name, RecordingLevel.INFO)).andReturn(expectedSensor);
+        expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap);
+        StreamsMetricsImpl.addSumMetricToSensor(
+            expectedSensor,
+            CLIENT_LEVEL_GROUP,
+            tagMap,
+            name,
+            false,
+            description
+        );
+        replay(StreamsMetricsImpl.class, streamsMetrics);
+
+        final Sensor sensor = ClientMetrics.failedStreamThreadSensor(streamsMetrics);

Review comment:
       nit: I like to insert a blank line after the call to test to visually separate setup, call, and verification.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,

Review comment:
       Unit tests for this method are missing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533585639



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##########
@@ -629,16 +657,18 @@ private void setupRemoveSensorsTest(final Metrics metrics,
     }
 
     @Test
-    public void shouldRemoveClientLevelMetrics() {
+    public void shouldRemoveClientLevelMetricsAndSensors() {
         final Metrics metrics = niceMock(Metrics.class);
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
-        addSensorsOnAllLevels(metrics, streamsMetrics);
+        final Capture<String> sensorKeys = addSensorsOnAllLevels(metrics, streamsMetrics);
         resetToDefault(metrics);
-        expect(metrics.removeMetric(metricName1)).andStubReturn(null);
-        expect(metrics.removeMetric(metricName2)).andStubReturn(null);
-        replay(metrics);
 
-        streamsMetrics.removeAllClientLevelMetrics();
+        metrics.removeSensor(sensorKeys.getValues().get(0));
+        metrics.removeSensor(sensorKeys.getValues().get(1));
+        expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
+        expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));

Review comment:
       Must've been an accidental change when trying to get the test to work. `shouldRemoveStateStoreLevelSensors` uses `andReturn(mock(KafkaMetric.class))` so that's where it came from, but this test works with `andStubReturn(null)` so I changed it back to that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r533148335



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -214,6 +215,20 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
         }
     }
 
+    public final Sensor clientLevelSensor(final String sensorName,
+                                          final RecordingLevel recordingLevel,
+                                          final Sensor... parents) {
+        synchronized (clientLevelSensors) {
+            final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
+            final Sensor sensor = metrics.getSensor(fullSensorName);

Review comment:
       I requested this. See my comment https://github.com/apache/kafka/pull/9614#discussion_r529313022




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#issuecomment-738231066


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

Posted by GitBox <gi...@apache.org>.
lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r526331025



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##########
@@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) thr
             builtInMetricsVersion
         );
         checkCacheMetrics(builtInMetricsVersion);
-
+        verifyFailedStreamThreadsSensor(0.0);

Review comment:
       We also need to verify that the metric works when there is a failed stream thread. Options are (1) to create a custom processor now and (IIRC) run the test suite twice, once with failing stream threads and once without to confirm that the metric works. I'm not sure if the custom processor will let us just fail one stream thread right before closing the app. Or (2) wait until add/remove stream threads is implemented and remove threads and test the metric after removing some threads before closing the app. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org