You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/15 18:59:41 UTC

[kafka] branch 1.1 updated: MINOR: Resuming Tasks should not be initialized twice (#4562)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 9a36ded  MINOR: Resuming Tasks should not be initialized twice (#4562)
9a36ded is described below

commit 9a36ded2f87dd95f5e23cbd5b6828ad20909f4f6
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Feb 15 10:52:58 2018 -0800

    MINOR: Resuming Tasks should not be initialized twice (#4562)
    
    Avoids double initialization of resuming tasks
    Removes race condition in StreamThreadTest plus code cleanup
    
    Author: Matthias J. Sax <ma...@confluent.io>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    |  1 -
 .../processor/internals/StreamsMetricsImpl.java    |  6 ++--
 .../processor/internals/StreamThreadTest.java      | 37 +++++++++++-----------
 3 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 56c0ab3..6bca02a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -192,7 +192,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             }
             transactionInFlight = true;
         }
-        initTopology();
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 03bbceb..b2ce2e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -129,11 +129,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
         // first add the global operation metrics if not yet, with the global tags only
         Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
-        addLatencyMetrics(scopeName, parent, operationName, allTagMap);
+        addLatencyAndThroughputMetrics(scopeName, parent, operationName, allTagMap);
 
         // add the operation metrics with additional tags
         Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
-        addLatencyMetrics(scopeName, sensor, operationName, tagMap);
+        addLatencyAndThroughputMetrics(scopeName, sensor, operationName, tagMap);
 
         parentSensors.put(sensor, parent);
 
@@ -161,7 +161,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return sensor;
     }
 
-    private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
+    private void addLatencyAndThroughputMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
 
         maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
             "The average latency of " + opName + " operation.", tags), new Avg());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14..6529be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -123,8 +123,6 @@ public class StreamThreadTest {
         };
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void testPartitionAssignmentChangeForSingleGroup() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
@@ -160,14 +158,13 @@ public class StreamThreadTest {
         assertTrue(thread.state() == StreamThread.State.PENDING_SHUTDOWN);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testStateChangeStartClose() throws InterruptedException {
-
         final StreamThread thread = createStreamThread(clientId, config, false);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
+
         thread.start();
         TestUtils.waitForCondition(new TestCondition() {
             @Override
@@ -175,19 +172,20 @@ public class StreamThreadTest {
                 return thread.state() == StreamThread.State.RUNNING;
             }
         }, 10 * 1000, "Thread never started.");
+
         thread.shutdown();
-        assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
                 return thread.state() == StreamThread.State.DEAD;
             }
         }, 10 * 1000, "Thread never shut down.");
+
         thread.shutdown();
         assertEquals(thread.state(), StreamThread.State.DEAD);
     }
 
-    private Cluster createCluster(int numNodes) {
+    private Cluster createCluster(final int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; ++i) {
             nodes.put(i, new Node(i, "localhost", 8121 + i));
@@ -352,7 +350,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException {
+    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final StreamThread thread = createStreamThread(clientId, config, false);
@@ -384,7 +382,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException {
+    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
@@ -413,7 +411,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
+    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
@@ -445,7 +443,7 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldShutdownTaskManagerOnClose() throws InterruptedException {
+    public void shouldShutdownTaskManagerOnClose() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
         EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap());
@@ -483,6 +481,7 @@ public class StreamThreadTest {
         EasyMock.verify(taskManager);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldShutdownTaskManagerOnCloseWithoutStart() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -511,6 +510,7 @@ public class StreamThreadTest {
         EasyMock.verify(taskManager);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldOnlyShutdownOnce() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -542,7 +542,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
+    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
         internalTopologyBuilder.addSink("out", "output", null, null, null);
 
@@ -563,7 +563,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException {
+    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
         internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
         internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
 
@@ -672,7 +672,8 @@ public class StreamThreadTest {
         ThreadStateTransitionValidator newState = null;
 
         @Override
-        public void onChange(final Thread thread, final ThreadStateTransitionValidator newState,
+        public void onChange(final Thread thread,
+                             final ThreadStateTransitionValidator newState,
                              final ThreadStateTransitionValidator oldState) {
             ++numChanges;
             if (this.newState != null) {
@@ -690,7 +691,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
+    public void shouldReturnActiveTaskMetadataWhileRunningState() {
         internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
 
         final StreamThread thread = createStreamThread(clientId, config, false);
@@ -720,7 +721,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException {
+    public void shouldReturnStandbyTaskMetadataWhileRunningState() {
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
 
@@ -760,7 +761,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
+    public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
         final StreamThread thread = createStreamThread(clientId, config, false);
         ThreadMetadata metadata = thread.threadMetadata();
         assertEquals(StreamThread.State.CREATED.name(), metadata.threadState());
@@ -771,7 +772,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException {
+    public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
 
@@ -912,7 +913,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
+    public void shouldReportSkippedRecordsForInvalidTimestamps() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final Properties config = configProps(false);

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.