You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "joobisb (via GitHub)" <gi...@apache.org> on 2023/06/29 10:58:33 UTC

[GitHub] [kafka] joobisb opened a new pull request, #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

joobisb opened a new pull request, #13931:
URL: https://github.com/apache/kafka/pull/13931

   Removed the references to `MockStreamsMetrics` since it actually was not a mock, instead used Mockito to mock the implementations. With this, we achieve better isolation of unit tests. In some unit tests the `StreamsMetricsImpl` implementation is used to assert the values and hence cannot be mocked. However, `MockStreamsMetrics` is removed from the majority of the unit tests where it is referenced.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] joobisb commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1257444324


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -140,6 +154,15 @@ public class RecordCollectorTest {
 
     @Before
     public void setup() {
+
+        PowerMock.mockStatic(StreamsMetricsImpl.class);

Review Comment:
   upon checking, the static mocks was not really needed for the newly added mock, hence removed that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] joobisb commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1257444324


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -140,6 +154,15 @@ public class RecordCollectorTest {
 
     @Before
     public void setup() {
+
+        PowerMock.mockStatic(StreamsMetricsImpl.class);

Review Comment:
   seems the static mocks was not needed for the newly added mock, hence removed that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1265659668


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -93,13 +99,19 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)

Review Comment:
   ```suggestion
   
   @RunWith(MockitoJUnitRunner.StrictStubs.class)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -186,57 +205,25 @@ public void cleanup() {
     public void shouldRecordRecordsAndBytesProduced() {
         final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
 
-        final String threadId = Thread.currentThread().getName();
-        final String processorNodeId = sinkNodeName;
-        final String topic = "topic";
-        final Metric recordsProduced = streamsMetrics.metrics().get(
-            new MetricName("records-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of records produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-        final Metric bytesProduced = streamsMetrics.metrics().get(
-            new MetricName("bytes-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of bytes produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-
-        double totalRecords = 0D;
-        double totalBytes = 0D;
+        final MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class);

Review Comment:
   If you use the static mock only in this method, I would suggest to use a `try-resource`-clause.
   
   ```java
   try (final MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class)) {
   ...
   }
   ```
   without the `topicMetrics.close()` at end. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -186,57 +205,25 @@ public void cleanup() {
     public void shouldRecordRecordsAndBytesProduced() {
         final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
 
-        final String threadId = Thread.currentThread().getName();
-        final String processorNodeId = sinkNodeName;
-        final String topic = "topic";
-        final Metric recordsProduced = streamsMetrics.metrics().get(
-            new MetricName("records-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of records produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-        final Metric bytesProduced = streamsMetrics.metrics().get(
-            new MetricName("bytes-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of bytes produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-
-        double totalRecords = 0D;
-        double totalBytes = 0D;
+        final MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class);
 
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        when(TopicMetrics.producedSensor(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(StreamsMetricsImpl.class)
+        )).thenReturn(mockSensor);

Review Comment:
   nit:
   we usually use 4 spaces:
   ```suggestion
           when(TopicMetrics.producedSensor(
               Mockito.anyString(),
               Mockito.anyString(),
               Mockito.anyString(),
               Mockito.anyString(),
               Mockito.any(StreamsMetricsImpl.class)
           )).thenReturn(mockSensor);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -186,57 +205,25 @@ public void cleanup() {
     public void shouldRecordRecordsAndBytesProduced() {
         final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
 
-        final String threadId = Thread.currentThread().getName();
-        final String processorNodeId = sinkNodeName;
-        final String topic = "topic";
-        final Metric recordsProduced = streamsMetrics.metrics().get(
-            new MetricName("records-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of records produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-        final Metric bytesProduced = streamsMetrics.metrics().get(
-            new MetricName("bytes-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of bytes produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-
-        double totalRecords = 0D;
-        double totalBytes = 0D;
+        final MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class);
 
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        when(TopicMetrics.producedSensor(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(StreamsMetricsImpl.class)
+        )).thenReturn(mockSensor);
 
         collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(0));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        double bytes = producerRecordSizeInBytes(mockProducer.history().get(0));
 
         collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(1));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
-
-        collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(2));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        bytes = producerRecordSizeInBytes(mockProducer.history().get(1));

Review Comment:
   Also here
   ```suggestion
           final double bytesWithHeaders = producerRecordSizeInBytes(mockProducer.history().get(1));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -471,23 +458,30 @@ public Optional<Set<Integer>> partitions(final String topic, final String key, f
                 taskId,
                 streamsProducer,
                 productionExceptionHandler,
-                streamsMetrics,
+                mockStreamsMetrics,
                 topology
         );
 
         final String topic = "topic";
 
-        final Metric recordsDropped = streamsMetrics.metrics().get(new MetricName(
+        final MetricName recordDropMetricName = new MetricName(
                 "dropped-records-total",
                 "stream-task-metrics",
                 "The total number of dropped records",
                 mkMap(
                         mkEntry("thread-id", Thread.currentThread().getName()),
                         mkEntry("task-id", taskId.toString())
                 )
-        ));
+        );
+
+        final Map<MetricName, Metric> mockMetricsMap = new HashMap<>();
+        final Metric recordDropMetric = Mockito.mock(Metric.class);
 
+        when(recordDropMetric.metricValue()).thenReturn(9.0);
+        mockMetricsMap.put(recordDropMetricName, recordDropMetric);
+        doReturn(mockMetricsMap).when(mockStreamsMetrics).metrics();
 
+        final Metric recordsDropped = mockStreamsMetrics.metrics().get(recordDropMetricName);

Review Comment:
   With this and the assert on line 501 `assertThat(recordsDropped.metricValue(), equalTo(9.0));` you basically test if you set the mocks correctly in the test. However, what you want to test is that the call under test -- i.e. `collector.send()` calls the sensor correctly. That means, you need to setup a sensor mock and verify the calls on it.
   
   Since streams metrics is a mock now, some tests need to be changed on how they test the behavior. Without the mock, we have to verify the actual metric. With the mock, we do not need to test the actual metric, we can mock the metrics. 
   
   The code should be something like:
   ```java
   @Test
   public void shouldDropAllRecords() {
   
       class DroppingPartitioner implements StreamPartitioner<String, Object> {
   
           @Override
           @Deprecated
           public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
               return null;
           }
   
           @Override
           public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
               return Optional.of(Collections.emptySet());
           }
       }
   
       try (final MockedStatic<TaskMetrics> taskMetrics = mockStatic(TaskMetrics.class)) {
           final Sensor droppedRecordsSensor = Mockito.mock(Sensor.class);
           when(TaskMetrics.droppedRecordsSensor(
               anyString(),
               eq(taskId.toString()),
               eq(mockStreamsMetrics))
           ).thenReturn(droppedRecordsSensor);
   
           final DroppingPartitioner droppingPartitioner = new DroppingPartitioner();
   
           final SinkNode<?, ?> sinkNode = new SinkNode<>(
               sinkNodeName,
               new StaticTopicNameExtractor<>(topic),
               stringSerializer,
               byteArraySerializer,
               droppingPartitioner);
           topology = new ProcessorTopology(
               emptyList(),
               emptyMap(),
               singletonMap(topic, sinkNode),
               emptyList(),
               emptyList(),
               emptyMap(),
               emptySet()
           );
           collector = new RecordCollectorImpl(
               logContext,
               taskId,
               streamsProducer,
               productionExceptionHandler,
               mockStreamsMetrics,
               topology
           );
   
           final String topic = "topic";
           final Headers headers = new RecordHeaders(new Header[] {new RecordHeader("key", "value".getBytes())});
   
           collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "9", "0", null, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "27", "0", null, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "81", "0", null, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "243", "0", null, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "28", "0", headers, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "82", "0", headers, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "244", "0", headers, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
           collector.send(topic, "245", "0", null, null, stringSerializer, stringSerializer, null, context, droppingPartitioner);
   
           final Map<TopicPartition, Long> offsets = collector.offsets();
           assertTrue(offsets.isEmpty());
   
           assertEquals(0, mockProducer.history().size());
           Mockito.verify(droppedRecordsSensor, times(9)).record();
   
           // returned offsets should not be modified
           final TopicPartition topicPartition = new TopicPartition(topic, 0);
           assertThrows(UnsupportedOperationException.class, () -> offsets.put(topicPartition, 50L));
       }
   }
   ```
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -135,11 +147,18 @@ public class RecordCollectorTest {
     private StreamsProducer streamsProducer;
     private ProcessorTopology topology;
     private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext<>();
-
     private RecordCollectorImpl collector;
+    final Sensor mockSensor = Mockito.mock(Sensor.class);
 
     @Before
     public void setup() {
+
+        when(mockStreamsMetrics.taskLevelSensor(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
+                Mockito.any(Sensor.RecordingLevel.class), Mockito.any(Sensor[].class))).thenReturn(mockSensor);

Review Comment:
   nit: 
   Just for better readability
   ```suggestion
           when(
               mockStreamsMetrics.taskLevelSensor(
                   Mockito.anyString(), 
                   Mockito.anyString(), 
                   Mockito.anyString(),
                   Mockito.any(Sensor.RecordingLevel.class), 
                   Mockito.any(Sensor[].class)
           )).thenReturn(mockSensor);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1408,20 +1409,29 @@ public void shouldThrowStreamsExceptionUsingDefaultExceptionHandler() {
     public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler() {
         try (final ErrorStringSerializer errorSerializer = new ErrorStringSerializer()) {
             final RecordCollector collector = newRecordCollector(new AlwaysContinueProductionExceptionHandler());
+            final MetricName metricName = new MetricName(
+                    "dropped-records-total",
+                    "stream-task-metrics",
+                    "The total number of dropped records",
+                    mkMap(
+                            mkEntry("thread-id", Thread.currentThread().getName()),
+                            mkEntry("task-id", taskId.toString())
+                    ));
+
+            final Map<MetricName, Metric> mockMetricsMap = new HashMap<>();
+            final Metric metric = Mockito.mock(Metric.class);
+
+            when(metric.metricValue()).thenReturn(1.0);
+            mockMetricsMap.put(metricName, metric);
+            doReturn(mockMetricsMap).when(mockStreamsMetrics).metrics();
+

Review Comment:
   Here the code also just verifies the mock setup, it does not verify any production code in `RecordCollectorImpl`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1260,15 +1254,22 @@ public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContin
             );
         }
 
-        final Metric metric = streamsMetrics.metrics().get(new MetricName(
-            "dropped-records-total",
-            "stream-task-metrics",
-            "The total number of dropped records",
-            mkMap(
-                mkEntry("thread-id", Thread.currentThread().getName()),
-                mkEntry("task-id", taskId.toString())
-            )
-        ));
+        final Map<MetricName, Metric> mockMetricsMap = new HashMap<>();
+        final MetricName metricName = new MetricName(
+                "dropped-records-total",
+                "stream-task-metrics",
+                "The total number of dropped records",
+                mkMap(
+                        mkEntry("thread-id", Thread.currentThread().getName()),
+                        mkEntry("task-id", taskId.toString())
+                )
+        );
+        final Metric mockMetric = Mockito.mock(Metric.class);
+        when(mockMetric.metricValue()).thenReturn(1.0);
+        mockMetricsMap.put(metricName, mockMetric);
+        doReturn(mockMetricsMap).when(mockStreamsMetrics).metrics();
+
+        final Metric metric = mockStreamsMetrics.metrics().get(metricName);

Review Comment:
   This change has the same issue as above



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -186,57 +205,25 @@ public void cleanup() {
     public void shouldRecordRecordsAndBytesProduced() {
         final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
 
-        final String threadId = Thread.currentThread().getName();
-        final String processorNodeId = sinkNodeName;
-        final String topic = "topic";
-        final Metric recordsProduced = streamsMetrics.metrics().get(
-            new MetricName("records-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of records produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-        final Metric bytesProduced = streamsMetrics.metrics().get(
-            new MetricName("bytes-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of bytes produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-
-        double totalRecords = 0D;
-        double totalBytes = 0D;
+        final MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class);
 
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        when(TopicMetrics.producedSensor(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(StreamsMetricsImpl.class)
+        )).thenReturn(mockSensor);
 
         collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(0));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        double bytes = producerRecordSizeInBytes(mockProducer.history().get(0));
 
         collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(1));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
-
-        collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(2));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        bytes = producerRecordSizeInBytes(mockProducer.history().get(1));
 
-        collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(3));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
-
-        collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(4));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        Mockito.verify(mockSensor).record(eq(bytes), anyLong());
+        Mockito.verify(mockSensor).record(eq(bytes), anyLong());

Review Comment:
   and should then be
   ```suggestion
           Mockito.verify(mockSensor).record(eq(bytesWithoutHeaders), anyLong());
           Mockito.verify(mockSensor).record(eq(bytesWithHeaders), anyLong());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -135,11 +147,18 @@ public class RecordCollectorTest {
     private StreamsProducer streamsProducer;
     private ProcessorTopology topology;
     private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext<>();
-
     private RecordCollectorImpl collector;
+    final Sensor mockSensor = Mockito.mock(Sensor.class);
 
     @Before
     public void setup() {
+
+        when(mockStreamsMetrics.taskLevelSensor(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
+                Mockito.any(Sensor.RecordingLevel.class), Mockito.any(Sensor[].class))).thenReturn(mockSensor);
+        when(mockStreamsMetrics.topicLevelSensor(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString(), Mockito.any(Sensor.RecordingLevel.class),
+                Mockito.any(Sensor[].class))).thenReturn(mockSensor);

Review Comment:
   nit:
   ```suggestion
           when(
               mockStreamsMetrics.topicLevelSensor(
                   Mockito.anyString(), 
                   Mockito.anyString(), 
                   Mockito.anyString(),
                   Mockito.anyString(), 
                   Mockito.anyString(), 
                   Mockito.any(Sensor.RecordingLevel.class),
                   Mockito.any(Sensor[].class)
           )).thenReturn(mockSensor);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -186,57 +205,25 @@ public void cleanup() {
     public void shouldRecordRecordsAndBytesProduced() {
         final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
 
-        final String threadId = Thread.currentThread().getName();
-        final String processorNodeId = sinkNodeName;
-        final String topic = "topic";
-        final Metric recordsProduced = streamsMetrics.metrics().get(
-            new MetricName("records-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of records produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-        final Metric bytesProduced = streamsMetrics.metrics().get(
-            new MetricName("bytes-produced-total",
-                           TOPIC_LEVEL_GROUP,
-                           "The total number of bytes produced from this topic",
-                           streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic))
-        );
-
-        double totalRecords = 0D;
-        double totalBytes = 0D;
+        final MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class);
 
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        when(TopicMetrics.producedSensor(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.any(StreamsMetricsImpl.class)
+        )).thenReturn(mockSensor);
 
         collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
-        ++totalRecords;
-        totalBytes += producerRecordSizeInBytes(mockProducer.history().get(0));
-        assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-        assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+        double bytes = producerRecordSizeInBytes(mockProducer.history().get(0));

Review Comment:
   I realized now that my suggestion in my previous review did make too much sense for some parts.
   ```suggestion
           final double bytesWithoutHeaders = producerRecordSizeInBytes(mockProducer.history().get(0));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1945902179

   > @joobisb Could you please fix the compilation errors?
   
   @cadonna done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1253107694


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -211,30 +242,50 @@ public void shouldRecordRecordsAndBytesProduced() {
         collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
         ++totalRecords;
         totalBytes += producerRecordSizeInBytes(mockProducer.history().get(0));
+
+        when(recordMockMetric.metricValue()).thenReturn(1.0);
+        when(byteMockMetric.metricValue()).thenReturn(29.0);
+
         assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
         assertThat(bytesProduced.metricValue(), equalTo(totalBytes));

Review Comment:
   I am afraid that is not what we should test. You basically put metric mocks `recordMockMetric` and `byteMockMetric` into a map (lines 230 and 231) and then read the same mocks from the map and assign them to `recordsProduced` and `bytesProduced` (lines 233 and 234), respectively. Until then, no code under test was executed. After that you specify the stubs on mocks `recordMockMetric` and `byteMockMetric` (lines 246 and 247) and assert that the stubs return the specified values (lines 249 and 250). With this you have not tested any production code. You just tested your stub definitions.
   
   The original test does also test a bit too much, IMO. The original test verified the recorded values by using the actual metrics. But it also tests that the metrics compute the correct values. That is actually already tested elsewhere (for example [here](https://github.com/apache/kafka/blob/39a555ba94a6a5d851b31e0a7f07e19c48327835/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java#L1212)) and actually does not need to be tested also in this test. Now that we use a mock, I would just test that method `record()` is called on the sensor when executing `collector.send()`. The test would then reduce to the following:
   ```java
   @Test
       public void shouldRecordRecordsAndBytesProduced() {
           final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
   
           collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
           double bytes = producerRecordSizeInBytes(mockProducer.history().get(0));
   
           collector.send(topic, "999", "0", headers, 1, null, stringSerializer, stringSerializer, sinkNodeName, context);
           bytes = producerRecordSizeInBytes(mockProducer.history().get(1));
   
           Mockito.verify(producedMockSensor).record(eq(bytes), anyLong());
           Mockito.verify(producedMockSensor).record(eq(bytes), anyLong());
       }
   ``` 
   
   To use the test I propose, you need to add the following static mock as a field:
   ```
   final private MockedStatic<TopicMetrics> topicMetrics = mockStatic(TopicMetrics.class);
   ```
   
   Do not forget to close this static mock in `cleanup()` otherwise tests will behave strangely:
   
   ```java
    @After
       public void cleanup() {
           collector.closeClean();
           topicMetrics.close();
       }
   ```
   and you need to specify two stubs in `setup()`:
   
   ```java
   when(TopicMetrics.producedSensor(
       threadId,
       taskId.toString(),
       processorNodeId,
       topic,
       mockStreamsMetrics
   )).thenReturn(producedMockSensor);
   when(TaskMetrics.droppedRecordsSensor(
       threadId,
       taskId.toString(),
       mockStreamsMetrics
   )).thenReturn(droppedMockSensor);
   
   ``` 
   
   Those stubs are called by the production code to get the sensors.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -140,6 +154,15 @@ public class RecordCollectorTest {
 
     @Before
     public void setup() {
+
+        PowerMock.mockStatic(StreamsMetricsImpl.class);

Review Comment:
   You can use Mockito to mock static methods. See https://github.com/apache/kafka/blob/8026a0edd8c309567ae075e5f4bcc4328ea91d9f/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/NamedCacheMetricsTest.java#L55
   
   We want to move away from EasyMock/PowerMock. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1489430066


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -138,11 +150,30 @@ public class RecordCollectorTest {
     private StreamsProducer streamsProducer;
     private ProcessorTopology topology;
     private final InternalProcessorContext<Void, Void> context = new InternalMockProcessorContext<>();
-
     private RecordCollectorImpl collector;
+    final Sensor mockSensor = Mockito.mock(Sensor.class);

Review Comment:
   ```suggestion
       private final Sensor mockSensor = Mockito.mock(Sensor.class);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1279,49 +1268,50 @@ public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueE
 
     @Test
     public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
-        final RecordCollector collector = new RecordCollectorImpl(
-            logContext,
-            taskId,
-            getExceptionalStreamsProducerOnSend(new Exception()),
-            new AlwaysContinueProductionExceptionHandler(),
-            streamsMetrics,
-            topology
-        );
+        try (final MockedStatic<TaskMetrics> taskMetrics = mockStatic(TaskMetrics.class)) {
+            final Sensor droppedRecordsSensor = Mockito.mock(Sensor.class);
+            when(TaskMetrics.droppedRecordsSensor(
+                    Mockito.anyString(),
+                    eq(taskId.toString()),
+                    eq(mockStreamsMetrics))
+            ).thenReturn(droppedRecordsSensor);
+
+            final RecordCollector collector = new RecordCollectorImpl(
+                logContext,
+                taskId,
+                getExceptionalStreamsProducerOnSend(new Exception()),
+                new AlwaysContinueProductionExceptionHandler(),
+                mockStreamsMetrics,
+                topology
+            );
 
-        try (final LogCaptureAppender logCaptureAppender =
-                 LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
-            logCaptureAppender.setThreshold(Level.INFO);
+            try (final LogCaptureAppender logCaptureAppender =
+                     LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
+                logCaptureAppender.setThreshold(Level.INFO);
 
-            collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
-            collector.flush();
+                collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
+                collector.flush();
 
-            final List<String> messages = logCaptureAppender.getMessages();
-            final StringBuilder errorMessage = new StringBuilder("Messages received:");
-            for (final String error : messages) {
-                errorMessage.append("\n - ").append(error);
+                final List<String> messages = logCaptureAppender.getMessages();
+                final StringBuilder errorMessage = new StringBuilder("Messages received:");
+                for (final String error : messages) {
+                    errorMessage.append("\n - ").append(error);
+                }
+                assertTrue(
+                    errorMessage.toString(),
+                    messages.get(messages.size() - 1)
+                            .endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.")
+                );
             }
-            assertTrue(
-                errorMessage.toString(),
-                messages.get(messages.size() - 1)
-                    .endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.")
-            );
-        }
 
-        final Metric metric = streamsMetrics.metrics().get(new MetricName(
-            "dropped-records-total",
-            "stream-task-metrics",
-            "The total number of dropped records",
-            mkMap(
-                mkEntry("thread-id", Thread.currentThread().getName()),
-                mkEntry("task-id", taskId.toString())
-            )
-        ));
-        assertEquals(1.0, metric.metricValue());
+            Mockito.verify(droppedRecordsSensor, Mockito.times(1)).record();
 
-        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
-        collector.flush();
-        collector.closeClean();
+            collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
+            collector.flush();
+            collector.closeClean();
+        }
     }
+    

Review Comment:
   nit:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1831,7 +1848,7 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
             taskId,
             config,
             stateManager,
-            streamsMetrics,
+            this.mockStreamsMetrics,

Review Comment:
   nit: We do not use `this` if it is not needed:
    ```suggestion
               mockStreamsMetrics,
   ```
   Could you please adapt the remainder of this class accordingly?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1349,7 +1339,7 @@ public void abortTransaction() {
                 Time.SYSTEM
             ),
             productionExceptionHandler,
-            streamsMetrics,
+                mockStreamsMetrics,

Review Comment:
   Could please fix the indentation here and below?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java:
##########
@@ -62,12 +62,16 @@ public abstract class AbstractKeyValueStoreTest {
 
     @Before
     public void before() {
-        driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        driver = this.createKeyValueStoreTestDriver();
         context = (InternalMockProcessorContext) driver.context();
         context.setTime(10);
         store = createKeyValueStore(context);
     }
 
+    protected KeyValueStoreTestDriver<Integer, String> createKeyValueStoreTestDriver() {
+        return KeyValueStoreTestDriver.create(Integer.class, String.class);
+    }

Review Comment:
   Why do you not use the following: 
   ```
       abstract protected KeyValueStoreTestDriver<Integer, String> createKeyValueStoreTestDriver();
   ```
   
   Isn't this method overridden in all subclasses?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java:
##########
@@ -83,12 +90,26 @@ public void setUp() {
         cacheFlushListener = new CacheFlushListenerStub<>(new StringDeserializer(), new StringDeserializer());
         store = new CachingKeyValueStore(underlyingStore, false);
         store.setFlushListener(cacheFlushListener, false);
-        cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, this.mockStreamsMetrics);
         context = new InternalMockProcessorContext<>(null, null, null, null, cache);
         context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
         store.init((StateStoreContext) context, null);
     }
 
+    @Override
+    protected KeyValueStoreTestDriver<Integer, String> createKeyValueStoreTestDriver() {

Review Comment:
   I am wondering if it might not be better to specify the mock and all its stubs for all sensor levels once in `AbstractKeyValueStoreTest`. That would avoid code duplication. What do you think?  



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java:
##########
@@ -62,12 +62,16 @@ public abstract class AbstractKeyValueStoreTest {
 
     @Before
     public void before() {
-        driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        driver = this.createKeyValueStoreTestDriver();

Review Comment:
   ```suggestion
           driver = createKeyValueStoreTestDriver();
   ```



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java:
##########
@@ -115,7 +118,7 @@ public void setup() {
             new ThreadCache(
                 new LogContext("testCache"),
                 0,
-                new MockStreamsMetrics(new Metrics())));
+                this.mockStreamsMetrics));

Review Comment:
   ```suggestion
                   mockStreamsMetrics));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1497,7 +1487,7 @@ private RecordCollector newRecordCollector(final ProductionExceptionHandler prod
             taskId,
             streamsProducer,
             productionExceptionHandler,
-            streamsMetrics,
+                mockStreamsMetrics,

Review Comment:
   Could please fix the indentation here and below?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java:
##########
@@ -83,12 +90,26 @@ public void setUp() {
         cacheFlushListener = new CacheFlushListenerStub<>(new StringDeserializer(), new StringDeserializer());
         store = new CachingKeyValueStore(underlyingStore, false);
         store.setFlushListener(cacheFlushListener, false);
-        cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
+        cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, this.mockStreamsMetrics);

Review Comment:
   ```suggestion
           cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, mockStreamsMetrics);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -853,7 +842,7 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
             taskId,
             streamsProducer,
             productionExceptionHandler,
-            streamsMetrics,
+                mockStreamsMetrics,

Review Comment:
   Could please fix the indentation here and below?



##########
streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java:
##########
@@ -194,6 +200,13 @@ public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> ke
     private final InternalMockProcessorContext context;
     private final StateSerdes<K, V> stateSerdes;
 
+    @Mock
+    private StreamsMetricsImpl mockStreamsMetrics;
+
+    public void setMockStreamsMetrics(final StreamsMetricsImpl mockStreamsMetrics) {
+        this.mockStreamsMetrics = mockStreamsMetrics;
+    }

Review Comment:
   This method does not seem to be used anywhere.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java:
##########
@@ -109,8 +112,7 @@ public void setUp() {
             recordCollector,
             new ThreadCache(
                 new LogContext("testCache"),
-                0,
-                new MockStreamsMetrics(new Metrics())));
+                0, mockStreamsMetrics));

Review Comment:
   ```suggestion
                   0, 
                   mockStreamsMetrics));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] joobisb commented on pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1666466685

   Hi @cadonna ,
   
   As per the comment, I've setup mocks and verified the calls on it, also addressed other comments as well, please have a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1943338949

   @cadonna i've addressed the changes and fixed merge conflicts, could you please take a look
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] joobisb commented on pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1614288335

   @cadonna could you please have a look at this PR ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] joobisb commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1257444754


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -211,30 +242,50 @@ public void shouldRecordRecordsAndBytesProduced() {
         collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
         ++totalRecords;
         totalBytes += producerRecordSizeInBytes(mockProducer.history().get(0));
+
+        when(recordMockMetric.metricValue()).thenReturn(1.0);
+        when(byteMockMetric.metricValue()).thenReturn(29.0);
+
         assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
         assertThat(bytesProduced.metricValue(), equalTo(totalBytes));

Review Comment:
   have updated the code, I needed to add the stubs and topicMetrics static mock inside the test. It was impacting other tests if I add it in setup().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] joobisb commented on pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

Posted by "joobisb (via GitHub)" <gi...@apache.org>.
joobisb commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1627655971

   @cadonna i have addressed the comments, could you please have a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1793324234

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1943728460

   @joobisb Could you please fix the compilation errors?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org