You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/02/17 01:14:12 UTC

[kafka] branch 2.8 updated (1392382 -> 123eff6)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 1392382  MINOR: Remove unused LeaderAndIsrResponse.partitions() since it has been replaced with partitionErrors() (#10127)
     new ab17937  KAFKA-12272: Fix commit-interval metrics (#10102)
     new 123eff6  MINOR: Clarify config names for EOS versions 1 and 2 (#9670)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/streams/core-concepts.html                    |   2 +-
 docs/streams/developer-guide/config-streams.html   |   6 +-
 docs/streams/upgrade-guide.html                    |  11 +-
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../processor/internals/StreamThreadTest.java      | 116 ++++++++++++++++++++-
 5 files changed, 128 insertions(+), 12 deletions(-)


[kafka] 02/02: MINOR: Clarify config names for EOS versions 1 and 2 (#9670)

Posted by mj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 123eff618f50721140ccd368ed7f966f581ada90
Author: Jim Galasyn <ji...@confluent.io>
AuthorDate: Tue Feb 16 17:11:24 2021 -0800

    MINOR: Clarify config names for EOS versions 1 and 2 (#9670)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 docs/streams/core-concepts.html                  |  2 +-
 docs/streams/developer-guide/config-streams.html |  6 +++---
 docs/streams/upgrade-guide.html                  | 11 ++++++-----
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/docs/streams/core-concepts.html b/docs/streams/core-concepts.html
index ddb37ea..7391c02 100644
--- a/docs/streams/core-concepts.html
+++ b/docs/streams/core-concepts.html
@@ -300,7 +300,7 @@
 
         To enable exactly-once semantics when running Kafka Streams applications,
         set the <code>processing.guarantee</code> config value (default value is <b>at_least_once</b>)
-        to <b>exactly_once</b> (requires brokers version 0.11.0 or newer) or <b>exactly_once_beta</b> (requires brokers version 2.5 or newer).
+        to <b>exactly_once</b> for EOS version 1 (requires brokers version 0.11.0 or newer) or <b>exactly_once_beta</b> for EOS version 2 (requires brokers version 2.5 or newer).
         For more information, see the <a href="/{{version}}/documentation/streams/developer-guide/config-streams.html">Kafka Streams Configs</a> section.
     </p>
 
diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html
index 07b47c0..fe35026 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -294,7 +294,7 @@
           <tr class="row-even"><td>processing.guarantee</td>
             <td>Medium</td>
             <td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
-              <code class="docutils literal"><span class="pre">"exactly_once"</span></code>, or <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code></td>.
+              <code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1), or <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2)</td>.
             <td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantedd"><span class="std std-ref">Processing Guarantee</span></a></td>
           </tr>
           <tr class="row-odd"><td>poll.ms</td>
@@ -668,8 +668,8 @@
           <blockquote>
             <div>The processing guarantee that should be used.
               Possible values are <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default),
-              <code class="docutils literal"><span class="pre">"exactly_once"</span></code>,
-              and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>.
+              <code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1),
+              and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2).
               Using <code class="docutils literal"><span class="pre">"exactly_once"</span></code> requires broker
               version 0.11.0 or newer, while using <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code>
               requires broker version 2.5 or newer.
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 7bb2971..2a6a760 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -53,8 +53,9 @@
     </ul>
 
     <p>
-        Starting in Kafka Streams 2.6.x, a new processing mode <code>"exactly_once_beta"</code> (configurable via parameter
-        <code>processing.guarantee</code>) is available.
+        Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS version 2, which is configurable by setting 
+        <code>processing.guarantee</code> to <code>"exactly_once_beta"</code>.
+        <b>NOTE:</b> The <code>"exactly_once_beta"</code> processing mode is ready for production (<i>i.e.</i>, it's not "beta" software). 
         To use this new feature, your brokers must be on version 2.5.x or newer.
         A switch from <code>"exactly_once"</code> to <code>"exactly_once_beta"</code> (or the other way around) is
         only possible if the application is on version 2.6.x.
@@ -162,7 +163,7 @@
 
     <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
     <p>
-        We added a new processing mode that improves application scalability using exactly-once guarantees
+        We added a new processing mode, EOS version 2, that improves application scalability using exactly-once guarantees
         (via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>).
         You can enable this new feature by setting the configuration parameter <code>processing.guarantee</code> to the
         new value <code>"exactly_once_beta"</code>.
@@ -894,9 +895,9 @@
 
     <p> Metrics using exactly-once semantics: </p>
     <p>
-        If <code>"exactly_once"</code> processing is enabled via the <code>processing.guarantee</code> parameter,
+        If <code>"exactly_once"</code> processing (EOS version 1) is enabled via the <code>processing.guarantee</code> parameter,
         internally Streams switches from a producer-per-thread to a producer-per-task runtime model.
-        Using <code>"exactly_once_beta"</code> does use a producer-per-thread, so <code>client.id</code> doesn't change,
+        Using <code>"exactly_once_beta"</code> (EOS version 2) does use a producer-per-thread, so <code>client.id</code> doesn't change,
         compared with <code>"at_least_once"</code> for this case).
         In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case.
         Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics.


[kafka] 01/02: KAFKA-12272: Fix commit-interval metrics (#10102)

Posted by mj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ab17937abcd6aee6192679baf7a0b4f1e0a15fe1
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Feb 11 16:49:05 2021 -0800

    KAFKA-12272: Fix commit-interval metrics (#10102)
    
    Reviewer: A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  |   5 +-
 .../processor/internals/StreamThreadTest.java      | 116 ++++++++++++++++++++-
 2 files changed, 118 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3ea40d5..3dc0c02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -776,9 +776,10 @@ public class StreamThread extends Thread {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitMs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitMs, 0);
                 totalCommitLatency += commitLatency;
                 if (committed > 0) {
                     commitSensor.record(commitLatency / (double) committed, now);
@@ -1016,7 +1017,7 @@ public class StreamThread extends Thread {
             if (committed == -1) {
                 log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
             } else {
-                advanceNowAndComputeLatency();
+                now = time.milliseconds();
                 lastCommitMs = now;
             }
         } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2f95a20..e4d083b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -117,6 +117,7 @@ import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.niceMock;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -836,6 +837,119 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldRecordCommitLatency() {
+        final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        expect(consumer.poll(anyObject())).andStubReturn(new ConsumerRecords<>(Collections.emptyMap()));
+        final Task task = niceMock(Task.class);
+        expect(task.id()).andStubReturn(task1);
+        expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
+        final ActiveTaskCreator activeTaskCreator = mock(ActiveTaskCreator.class);
+        expect(activeTaskCreator.createTasks(anyObject(), anyObject())).andStubReturn(Collections.singleton(task));
+        expect(activeTaskCreator.producerClientIds()).andStubReturn(Collections.singleton("producerClientId"));
+        EasyMock.replay(consumer, consumerGroupMetadata, task, activeTaskCreator);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+        final TaskManager taskManager = new TaskManager(
+            null,
+            null,
+            null,
+            null,
+            null,
+            activeTaskCreator,
+            null,
+            internalTopologyBuilder,
+            null,
+            null,
+            null
+        ) {
+            @Override
+            int commit(final Collection<Task> tasksToCommit) {
+                mockTime.sleep(10L);
+                return 1;
+            }
+        };
+        taskManager.setMainConsumer(consumer);
+
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            config,
+            null,
+            consumer,
+            consumer,
+            changelogReader,
+            null,
+            taskManager,
+            streamsMetrics,
+            internalTopologyBuilder,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        );
+        thread.updateThreadMetadata("adminClientId");
+        thread.setState(StreamThread.State.STARTING);
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(task1, Collections.singleton(t1p1));
+        thread.taskManager().handleAssignment(activeTasks, emptyMap());
+        thread.rebalanceListener().onPartitionsAssigned(Collections.singleton(t1p1));
+
+        assertTrue(
+            Double.isNaN(
+                (Double) streamsMetrics.metrics().get(new MetricName(
+                    "commit-latency-max",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID))
+                ).metricValue()
+            )
+        );
+        assertTrue(
+            Double.isNaN(
+                (Double) streamsMetrics.metrics().get(new MetricName(
+                    "commit-latency-avg",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID))
+                ).metricValue()
+            )
+        );
+
+        thread.runOnce();
+
+        assertThat(
+            streamsMetrics.metrics().get(
+                new MetricName(
+                    "commit-latency-max",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID)
+                )
+            ).metricValue(),
+            equalTo(10.0)
+        );
+        assertThat(
+            streamsMetrics.metrics().get(
+                new MetricName(
+                    "commit-latency-avg",
+                    "stream-thread-metrics",
+                    "",
+                    Collections.singletonMap("thread-id", CLIENT_ID)
+                )
+            ).metricValue(),
+            equalTo(10.0)
+        );
+    }
+
+    @Test
     public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
         internalStreamsBuilder.buildAndOptimizeTopology();
@@ -2621,7 +2735,7 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
         internalTopologyBuilder.addProcessor(
             "processor1",
-            (ProcessorSupplier<byte[], byte[], ?, ?>) () -> new MockApiProcessor<>(),
+            (ProcessorSupplier<byte[], byte[], ?, ?>) MockApiProcessor::new,
             "source1"
         );
     }