You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/16 17:41:51 UTC

kafka git commit: KAFKA-3769: Create new sensors per-thread in KafkaStreams

Repository: kafka
Updated Branches:
  refs/heads/trunk e49b3aee6 -> ae237be1b


KAFKA-3769: Create new sensors per-thread in KafkaStreams

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Michael G. Noll <mi...@confluent.io>, Greg Fodor <gf...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1530 from guozhangwang/K3769-per-thread-metrics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae237be1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae237be1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae237be1

Branch: refs/heads/trunk
Commit: ae237be1bbfd7d1b84a5095491a6131cd3cc9346
Parents: e49b3ae
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Aug 16 10:43:12 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Aug 16 10:43:12 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |   3 +-
 .../clients/producer/internals/Sender.java      |   7 +-
 config/server.properties                        |   3 +
 .../apache/kafka/streams/StreamsMetrics.java    |  12 ++
 .../processor/internals/StreamThread.java       | 159 +++++++++++--------
 .../kafka/streams/perf/SimpleBenchmark.java     |  11 +-
 .../processor/internals/StreamThreadTest.java   |   6 +-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 8 files changed, 123 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fec9b6e..913ce9e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -807,8 +807,7 @@ public class Fetcher<K, V> {
             String name = "topic." + topic + ".bytes-fetched";
             Sensor bytesFetched = this.metrics.getSensor(name);
             if (bytesFetched == null) {
-                Map<String, String> metricTags = new HashMap<>(1);
-                metricTags.put("topic", topic.replace('.', '_'));
+                Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_'));
 
                 bytesFetched = this.metrics.sensor(name);
                 bytesFetched.add(this.metrics.metricName("fetch-size-avg",

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index f1852b5..30f8887 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -14,9 +14,9 @@ package org.apache.kafka.clients.producer.internals;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -457,14 +457,13 @@ public class Sender implements Runnable {
             });
         }
 
-        public void maybeRegisterTopicMetrics(String topic) {
+        private void maybeRegisterTopicMetrics(String topic) {
             // if one sensor of the metrics has been registered for the topic,
             // then all other sensors should have been registered; and vice versa
             String topicRecordsCountName = "topic." + topic + ".records-per-batch";
             Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
             if (topicRecordCount == null) {
-                Map<String, String> metricTags = new LinkedHashMap<String, String>();
-                metricTags.put("topic", topic);
+                Map<String, String> metricTags = Collections.singletonMap("topic", topic);
                 String metricGrpName = "producer-topic-metrics";
 
                 topicRecordCount = this.metrics.sensor(topicRecordsCountName);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index d1b1753..f00a7d6 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -20,6 +20,9 @@
 # The id of the broker. This must be set to a unique integer for each broker.
 broker.id=0
 
+# Switch to enable topic deletion or not, default value is false
+#delete.topic.enable=true
+
 ############################# Socket Server Settings #############################
 
 # The address the socket server listens on. It will get the value returned from 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
index 70c3320..c0870c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -24,7 +24,19 @@ import org.apache.kafka.common.metrics.Sensor;
  */
 public interface StreamsMetrics {
 
+    /**
+     * Add the latency sensor.
+     *
+     * @param scopeName Name of the scope, could be the type of the state store, etc.
+     * @param entityName Name of the entity, could be the name of the state store instance, etc.
+     * @param operationName Name of the operation, could be get / put / delete / etc.
+     * @param tags Additional tags of the sensor.
+     * @return The added sensor.
+     */
     Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
 
+    /**
+     * Record the given latency value of the sensor.
+     */
     void recordLatency(Sensor sensor, long startNs, long endNs);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f416443..50d77c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -52,7 +52,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -83,6 +82,7 @@ public class StreamThread extends Thread {
     protected final Consumer<byte[], byte[]> consumer;
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
+    private final String threadClientId;
     private final AtomicBoolean running;
     private final Map<TaskId, StreamTask> activeTasks;
     private final Map<TaskId, StandbyTask> standbyTasks;
@@ -98,8 +98,9 @@ public class StreamThread extends Thread {
 
     private StreamPartitionAssignor partitionAssignor = null;
 
-    private long lastClean;
-    private long lastCommit;
+    private long timerStartedMs;
+    private long lastCleanMs;
+    private long lastCommitMs;
     private Throwable rebalanceException = null;
 
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
@@ -111,7 +112,7 @@ public class StreamThread extends Thread {
             try {
                 addStreamTasks(assignment);
                 addStandbyTasks();
-                lastClean = time.milliseconds(); // start the cleaning cycle
+                lastCleanMs = time.milliseconds(); // start the cleaning cycle
                 streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
             } catch (Throwable t) {
                 rebalanceException = t;
@@ -123,7 +124,7 @@ public class StreamThread extends Thread {
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             try {
                 commitAll();
-                lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
+                lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
             } catch (Throwable t) {
                 rebalanceException = t;
                 throw t;
@@ -160,7 +161,7 @@ public class StreamThread extends Thread {
 
         // set the producer and consumer clients
         String threadName = getName();
-        String threadClientId = clientId + "-" + threadName;
+        threadClientId = clientId + "-" + threadName;
         log.info("Creating producer client for stream thread [{}]", threadName);
         this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
         log.info("Creating consumer client for stream thread [{}]", threadName);
@@ -187,9 +188,10 @@ public class StreamThread extends Thread {
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
 
-        this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
-        this.lastCommit = time.milliseconds();
         this.time = time;
+        this.timerStartedMs = time.milliseconds();
+        this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
+        this.lastCommitMs = timerStartedMs;
 
         this.sensors = new StreamsMetricsImpl(metrics);
 
@@ -274,10 +276,26 @@ public class StreamThread extends Thread {
         log.info("Stream thread shutdown complete [" + this.getName() + "]");
     }
 
+    /**
+     * Compute the latency based on the current marked timestamp,
+     * and update the marked timestamp with the current system timestamp.
+     *
+     * @return latency
+     */
+    private long computeLatency() {
+        long previousTimeMs = this.timerStartedMs;
+        this.timerStartedMs = time.milliseconds();
+
+        return Math.max(this.timerStartedMs - previousTimeMs, 0);
+    }
+
     private void runLoop() {
         int totalNumBuffered = 0;
-        long lastPoll = 0L;
         boolean requiresPoll = true;
+        boolean polledRecords = false;
+
+        // TODO: this can be removed after KIP-62
+        long lastPoll = 0L;
 
         if (topicPattern != null) {
             consumer.subscribe(topicPattern, rebalanceListener);
@@ -287,13 +305,15 @@ public class StreamThread extends Thread {
 
 
         while (stillRunning()) {
+            this.timerStartedMs = time.milliseconds();
+
             // try to fetch some records if necessary
             if (requiresPoll) {
                 requiresPoll = false;
 
-                long startPoll = time.milliseconds();
+                boolean longPoll = totalNumBuffered == 0;
 
-                ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(longPoll ? this.pollTimeMs : 0);
                 lastPoll = time.milliseconds();
 
                 if (rebalanceException != null)
@@ -304,41 +324,51 @@ public class StreamThread extends Thread {
                         StreamTask task = activeTasksByPartition.get(partition);
                         task.addRecords(partition, records.records(partition));
                     }
+                    polledRecords = true;
+                } else {
+                    polledRecords = false;
                 }
 
-                long endPoll = time.milliseconds();
-                sensors.pollTimeSensor.record(endPoll - startPoll);
+                // only record poll latency is long poll is required
+                if (longPoll) {
+                    sensors.pollTimeSensor.record(computeLatency());
+                }
             }
 
-            totalNumBuffered = 0;
-
             // try to process one fetch record from each task via the topology, and also trigger punctuate
             // functions if necessary, which may result in more records going through the topology in this loop
-            if (!activeTasks.isEmpty()) {
-                for (StreamTask task : activeTasks.values()) {
-                    long startProcess = time.milliseconds();
+            if (totalNumBuffered > 0 || polledRecords) {
+                totalNumBuffered = 0;
 
-                    totalNumBuffered += task.process();
-                    requiresPoll = requiresPoll || task.requiresPoll();
+                if (!activeTasks.isEmpty()) {
+                    for (StreamTask task : activeTasks.values()) {
 
-                    sensors.processTimeSensor.record(time.milliseconds() - startProcess);
+                        totalNumBuffered += task.process();
 
-                    maybePunctuate(task);
+                        requiresPoll = requiresPoll || task.requiresPoll();
 
-                    if (task.commitNeeded())
-                        commitOne(task, time.milliseconds());
-                }
+                        sensors.processTimeSensor.record(computeLatency());
 
-                // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
-                // even when we paused all partitions.
-                if (lastPoll + this.pollTimeMs < time.milliseconds())
-                    requiresPoll = true;
+                        maybePunctuate(task);
+
+                        if (task.commitNeeded())
+                            commitOne(task);
+                    }
+
+                    // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
+                    // even when we paused all partitions.
+                    if (lastPoll + this.pollTimeMs < this.timerStartedMs)
+                        requiresPoll = true;
 
+                } else {
+                    // even when no task is assigned, we must poll to get a task.
+                    requiresPoll = true;
+                }
+                maybeCommit();
             } else {
-                // even when no task is assigned, we must poll to get a task.
                 requiresPoll = true;
             }
-            maybeCommit();
+
             maybeUpdateStandbyTasks();
 
             maybeClean();
@@ -401,12 +431,10 @@ public class StreamThread extends Thread {
 
     private void maybePunctuate(StreamTask task) {
         try {
-            long now = time.milliseconds();
-
             // check whether we should punctuate based on the task's partition group timestamp;
             // which are essentially based on record timestamp.
             if (task.maybePunctuate())
-                sensors.punctuateTimeSensor.record(time.milliseconds() - now);
+                sensors.punctuateTimeSensor.record(computeLatency());
 
         } catch (KafkaException e) {
             log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
@@ -414,35 +442,50 @@ public class StreamThread extends Thread {
         }
     }
 
+    /**
+     * Commit all tasks owned by this thread if specified interval time has elapsed
+     */
     protected void maybeCommit() {
         long now = time.milliseconds();
 
-        if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) {
+        if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
             log.trace("Committing processor instances because the commit interval has elapsed.");
 
             commitAll();
-            lastCommit = now;
+            lastCommitMs = now;
 
             processStandbyRecords = true;
         }
     }
 
     /**
+     * Cleanup any states of the tasks that have been removed from this thread
+     */
+    protected void maybeClean() {
+        long now = time.milliseconds();
+
+        if (now > lastCleanMs + cleanTimeMs) {
+            stateDirectory.cleanRemovedTasks();
+            lastCleanMs = now;
+        }
+    }
+
+    /**
      * Commit the states of all its tasks
      */
     private void commitAll() {
         for (StreamTask task : activeTasks.values()) {
-            commitOne(task, time.milliseconds());
+            commitOne(task);
         }
         for (StandbyTask task : standbyTasks.values()) {
-            commitOne(task, time.milliseconds());
+            commitOne(task);
         }
     }
 
     /**
      * Commit the state of a task
      */
-    private void commitOne(AbstractTask task, long now) {
+    private void commitOne(AbstractTask task) {
         try {
             task.commit();
         } catch (CommitFailedException e) {
@@ -454,19 +497,7 @@ public class StreamThread extends Thread {
             throw e;
         }
 
-        sensors.commitTimeSensor.record(time.milliseconds() - now);
-    }
-
-    /**
-     * Cleanup any states of the tasks that have been removed from this thread
-     */
-    protected void maybeClean() {
-        long now = time.milliseconds();
-
-        if (now > lastClean + cleanTimeMs) {
-            stateDirectory.cleanRemovedTasks();
-            lastClean = now;
-        }
+        sensors.commitTimeSensor.record(computeLatency());
     }
 
     /**
@@ -682,6 +713,7 @@ public class StreamThread extends Thread {
     private class StreamsMetricsImpl implements StreamsMetrics {
         final Metrics metrics;
         final String metricGrpName;
+        final String sensorNamePrefix;
         final Map<String, String> metricTags;
 
         final Sensor commitTimeSensor;
@@ -692,42 +724,41 @@ public class StreamThread extends Thread {
         final Sensor taskDestructionSensor;
 
         public StreamsMetricsImpl(Metrics metrics) {
-
             this.metrics = metrics;
             this.metricGrpName = "stream-metrics";
-            this.metricTags = new LinkedHashMap<>();
-            this.metricTags.put("client-id", clientId + "-" + getName());
+            this.sensorNamePrefix = "thread." + threadClientId;
+            this.metricTags = Collections.singletonMap("client-id", threadClientId);
 
-            this.commitTimeSensor = metrics.sensor("commit-time");
+            this.commitTimeSensor = metrics.sensor(sensorNamePrefix + ".commit-time");
             this.commitTimeSensor.add(metrics.metricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
             this.commitTimeSensor.add(metrics.metricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
             this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
 
-            this.pollTimeSensor = metrics.sensor("poll-time");
+            this.pollTimeSensor = metrics.sensor(sensorNamePrefix + ".poll-time");
             this.pollTimeSensor.add(metrics.metricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
             this.pollTimeSensor.add(metrics.metricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
             this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
 
-            this.processTimeSensor = metrics.sensor("process-time");
+            this.processTimeSensor = metrics.sensor(sensorNamePrefix + ".process-time");
             this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
             this.processTimeSensor.add(metrics.metricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
             this.processTimeSensor.add(metrics.metricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
 
-            this.punctuateTimeSensor = metrics.sensor("punctuate-time");
+            this.punctuateTimeSensor = metrics.sensor(sensorNamePrefix + ".punctuate-time");
             this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
             this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
             this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
 
-            this.taskCreationSensor = metrics.sensor("task-creation");
+            this.taskCreationSensor = metrics.sensor(sensorNamePrefix + ".task-creation");
             this.taskCreationSensor.add(metrics.metricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
 
-            this.taskDestructionSensor = metrics.sensor("task-destruction");
+            this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
             this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
         }
 
         @Override
         public void recordLatency(Sensor sensor, long startNs, long endNs) {
-            sensor.record((endNs - startNs) / 1000000, endNs);
+            sensor.record(endNs - startNs, timerStartedMs);
         }
 
         /**
@@ -746,11 +777,11 @@ public class StreamThread extends Thread {
             String metricGroupName = "stream-" + scopeName + "-metrics";
 
             // first add the global operation metrics if not yet, with the global tags only
-            Sensor parent = metrics.sensor(scopeName + "-" + operationName);
+            Sensor parent = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + operationName);
             addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
 
             // add the store operation metrics with additional tags
-            Sensor sensor = metrics.sensor(scopeName + "-" + entityName + "-" + operationName, parent);
+            Sensor sensor = metrics.sensor(sensorNamePrefix + "." + scopeName + "-" + entityName + "-" + operationName, parent);
             addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
 
             return sensor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 93bb571..ea9584b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -83,11 +83,6 @@ public class SimpleBenchmark {
         final File rocksdbDir = new File(stateDir, "rocksdb-test");
         rocksdbDir.mkdir();
 
-        System.out.println("SimpleBenchmark instance started");
-        System.out.println("kafka=" + kafka);
-        System.out.println("zookeeper=" + zookeeper);
-        System.out.println("stateDir=" + stateDir);
-
         SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
 
         // producer performance
@@ -232,6 +227,7 @@ public class SimpleBenchmark {
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
 
@@ -250,7 +246,10 @@ public class SimpleBenchmark {
                     break;
             } else {
                 for (ConsumerRecord<Long, byte[]> record : records) {
-                    key = record.key();
+                    Long recKey = record.key();
+
+                    if (key == null || key < recKey)
+                        key = recKey;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3a90ce3..1a66d32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -306,14 +306,14 @@ public class StreamThreadTest {
 
             List<TopicPartition> revokedPartitions;
             List<TopicPartition> assignedPartitions;
-            Map<Integer, StreamTask> prevTasks;
+            Map<TaskId, StreamTask> prevTasks;
 
             //
             // Assign t1p1 and t1p2. This should create task1 & task2
             //
             revokedPartitions = Collections.emptyList();
             assignedPartitions = Arrays.asList(t1p1, t1p2);
-            prevTasks = new HashMap(thread.tasks());
+            prevTasks = new HashMap<>(thread.tasks());
 
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);
@@ -345,7 +345,7 @@ public class StreamThreadTest {
             //
             revokedPartitions = assignedPartitions;
             assignedPartitions = Collections.emptyList();
-            prevTasks = new HashMap(thread.tasks());
+            prevTasks = new HashMap<>(thread.tasks());
 
             rebalanceListener.onPartitionsRevoked(revokedPartitions);
             rebalanceListener.onPartitionsAssigned(assignedPartitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ae237be1/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 0416e40..1baedbb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -218,8 +218,7 @@ public class StreamThreadStateStoreProviderTest {
         }
 
         @Override
-        public void recordLatency(final Sensor sensor, final long startNs,
-                                  final long endNs) {
+        public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
 
         }
     }