You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/15 18:59:41 UTC
[kafka] branch 1.1 updated: MINOR: Resuming Tasks should not be
initialized twice (#4562)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 9a36ded MINOR: Resuming Tasks should not be initialized twice (#4562)
9a36ded is described below
commit 9a36ded2f87dd95f5e23cbd5b6828ad20909f4f6
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Thu Feb 15 10:52:58 2018 -0800
MINOR: Resuming Tasks should not be initialized twice (#4562)
Avoids double initialization of resuming tasks
Removes race condition in StreamThreadTest plus code cleanup
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 1 -
.../processor/internals/StreamsMetricsImpl.java | 6 ++--
.../processor/internals/StreamThreadTest.java | 37 +++++++++++-----------
3 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 56c0ab3..6bca02a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -192,7 +192,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
transactionInFlight = true;
}
- initTopology();
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 03bbceb..b2ce2e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -129,11 +129,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
- addLatencyMetrics(scopeName, parent, operationName, allTagMap);
+ addLatencyAndThroughputMetrics(scopeName, parent, operationName, allTagMap);
// add the operation metrics with additional tags
Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
- addLatencyMetrics(scopeName, sensor, operationName, tagMap);
+ addLatencyAndThroughputMetrics(scopeName, sensor, operationName, tagMap);
parentSensors.put(sensor, parent);
@@ -161,7 +161,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
return sensor;
}
- private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
+ private void addLatencyAndThroughputMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
"The average latency of " + opName + " operation.", tags), new Avg());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e67fe14..6529be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -123,8 +123,6 @@ public class StreamThreadTest {
};
}
-
- @SuppressWarnings("unchecked")
@Test
public void testPartitionAssignmentChangeForSingleGroup() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
@@ -160,14 +158,13 @@ public class StreamThreadTest {
assertTrue(thread.state() == StreamThread.State.PENDING_SHUTDOWN);
}
- @SuppressWarnings("unchecked")
@Test
public void testStateChangeStartClose() throws InterruptedException {
-
final StreamThread thread = createStreamThread(clientId, config, false);
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
+
thread.start();
TestUtils.waitForCondition(new TestCondition() {
@Override
@@ -175,19 +172,20 @@ public class StreamThreadTest {
return thread.state() == StreamThread.State.RUNNING;
}
}, 10 * 1000, "Thread never started.");
+
thread.shutdown();
- assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return thread.state() == StreamThread.State.DEAD;
}
}, 10 * 1000, "Thread never shut down.");
+
thread.shutdown();
assertEquals(thread.state(), StreamThread.State.DEAD);
}
- private Cluster createCluster(int numNodes) {
+ private Cluster createCluster(final int numNodes) {
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; ++i) {
nodes.put(i, new Node(i, "localhost", 8121 + i));
@@ -352,7 +350,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException {
+ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, config, false);
@@ -384,7 +382,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException {
+ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
@@ -413,7 +411,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
+ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
@@ -445,7 +443,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldShutdownTaskManagerOnClose() throws InterruptedException {
+ public void shouldShutdownTaskManagerOnClose() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap());
@@ -483,6 +481,7 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldShutdownTaskManagerOnCloseWithoutStart() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -511,6 +510,7 @@ public class StreamThreadTest {
EasyMock.verify(taskManager);
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldOnlyShutdownOnce() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -542,7 +542,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
+ public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
internalTopologyBuilder.addSink("out", "output", null, null, null);
@@ -563,7 +563,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException {
+ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
@@ -672,7 +672,8 @@ public class StreamThreadTest {
ThreadStateTransitionValidator newState = null;
@Override
- public void onChange(final Thread thread, final ThreadStateTransitionValidator newState,
+ public void onChange(final Thread thread,
+ final ThreadStateTransitionValidator newState,
final ThreadStateTransitionValidator oldState) {
++numChanges;
if (this.newState != null) {
@@ -690,7 +691,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException {
+ public void shouldReturnActiveTaskMetadataWhileRunningState() {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
final StreamThread thread = createStreamThread(clientId, config, false);
@@ -720,7 +721,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException {
+ public void shouldReturnStandbyTaskMetadataWhileRunningState() {
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
@@ -760,7 +761,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldAlwaysUpdateTasksMetadataAfterChangingState() throws InterruptedException {
+ public void shouldAlwaysUpdateTasksMetadataAfterChangingState() {
final StreamThread thread = createStreamThread(clientId, config, false);
ThreadMetadata metadata = thread.threadMetadata();
assertEquals(StreamThread.State.CREATED.name(), metadata.threadState());
@@ -771,7 +772,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException {
+ public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
.groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
@@ -912,7 +913,7 @@ public class StreamThreadTest {
}
@Test
- public void shouldReportSkippedRecordsForInvalidTimestamps() throws Exception {
+ public void shouldReportSkippedRecordsForInvalidTimestamps() {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
final Properties config = configProps(false);
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.