You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/01 22:36:11 UTC

kafka git commit: MINOR: Minor reduce unnecessary calls to time.millisecond (part 2)

Repository: kafka
Updated Branches:
  refs/heads/trunk b380a82d5 -> ef92bb4e0


MINOR: Minor reduce unnecessary calls to time.millisecond (part 2)

Avoid calling time.milliseconds more often than necessary. Cleaning and committing logic can use the timestamp at the start of the loop with minimal consequences. 5-10% improvements noticed with request rates of 450K records/second.

Also tidy up benchmark code a bit more.

Author: Eno Thereska <en...@gmail.com>
Author: Eno Thereska <en...@confluent.io>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #2603 from enothereska/minor-reduce-milliseconds2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef92bb4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef92bb4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef92bb4e

Branch: refs/heads/trunk
Commit: ef92bb4e00da10728cf74c2d81f8f2bbec4c9c02
Parents: b380a82
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Mar 1 14:36:08 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 1 14:36:08 2017 -0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 10 +++----
 .../kafka/streams/perf/SimpleBenchmark.java     | 26 +++++++-----------
 .../processor/internals/StreamThreadTest.java   | 28 ++++++++++----------
 3 files changed, 28 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7f48200..033dc73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -610,10 +610,10 @@ public class StreamThread extends Thread {
             } else {
                 requiresPoll = true;
             }
-            maybeCommit();
+            maybeCommit(timerStartedMs);
             maybeUpdateStandbyTasks();
 
-            maybeClean();
+            maybeClean(timerStartedMs);
         }
         log.info("{} Shutting down at user request", logPrefix);
     }
@@ -682,8 +682,7 @@ public class StreamThread extends Thread {
     /**
      * Commit all tasks owned by this thread if specified interval time has elapsed
      */
-    protected void maybeCommit() {
-        long now = time.milliseconds();
+    protected void maybeCommit(final long now) {
 
         if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
             log.info("{} Committing all tasks because the commit interval {}ms has elapsed", logPrefix, commitTimeMs);
@@ -698,8 +697,7 @@ public class StreamThread extends Thread {
     /**
      * Cleanup any states of the tasks that have been removed from this thread
      */
-    protected void maybeClean() {
-        long now = time.milliseconds();
+    protected void maybeClean(final long now) {
 
         if (now > lastCleanMs + cleanTimeMs) {
             stateDirectory.cleanRemovedTasks(cleanTimeMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index dc1bdf5..cf593e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -141,7 +141,7 @@ public class SimpleBenchmark {
                 kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
                 break;
             case "produce":
-                produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
+                produce(SOURCE_TOPIC);
                 break;
             case "consume":
                 consume(SOURCE_TOPIC);
@@ -444,7 +444,6 @@ public class SimpleBenchmark {
             return;
         }
         produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
-
     }
     /**
      * Produce values to a topic
@@ -458,9 +457,10 @@ public class SimpleBenchmark {
      *                   when this produce step is part of another benchmark that produces its own stats
      */
     private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
-                        int upperRange, boolean printStats) throws Exception {
-
+                         int upperRange, boolean printStats) throws Exception {
 
+        processedRecords = 0;
+        processedBytes = 0;
         if (sequential) {
             if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
         }
@@ -486,17 +486,15 @@ public class SimpleBenchmark {
             producer.send(new ProducerRecord<>(topic, key, value));
             if (sequential) key++;
             else key = rand.nextInt(upperRange);
+            processedRecords++;
+            processedBytes += value.length + Integer.SIZE;
         }
         producer.close();
 
         long endTime = System.currentTimeMillis();
 
         if (printStats) {
-            System.out.println("Producer Performance [records/latency/rec-sec/MB-sec write]: " +
-                numRecords + "/" +
-                (endTime - startTime) + "/" +
-                recordsPerSec(endTime - startTime, numRecords) + "/" +
-                megabytesPerSec(endTime - startTime, numRecords * valueSizeBytes));
+            printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
         }
     }
 
@@ -540,11 +538,7 @@ public class SimpleBenchmark {
         long endTime = System.currentTimeMillis();
 
         consumer.close();
-        System.out.println("Consumer Performance [records/latency/rec-sec/MB-sec read]: " +
-            processedRecords + "/" +
-            (endTime - startTime) + "/" +
-            recordsPerSec(endTime - startTime, processedRecords) + "/" +
-            megabytesPerSec(endTime - startTime, processedBytes));
+        printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
     }
 
     private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
@@ -734,11 +728,11 @@ public class SimpleBenchmark {
     }
 
     private double megabytesPerSec(long time, long processedBytes) {
-        return  ((double) processedBytes / 1024 / 1024) / (time / 1000.0);
+        return  (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
     }
 
     private double recordsPerSec(long time, int numRecords) {
-        return (double) numRecords / ((double) time / 1000);
+        return numRecords / (time / 1000.0);
     }
 
     private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef92bb4e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 87b30b2..e36a236 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -485,7 +485,7 @@ public class StreamThreadTest {
             stateDir3.mkdir();
             extraDir.mkdir();
 
-            MockTime mockTime = new MockTime();
+            final MockTime mockTime = new MockTime();
 
             TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
@@ -495,8 +495,8 @@ public class StreamThreadTest {
                                                    0) {
 
                 @Override
-                public void maybeClean() {
-                    super.maybeClean();
+                public void maybeClean(long now) {
+                    super.maybeClean(now);
                 }
 
                 @Override
@@ -547,7 +547,7 @@ public class StreamThreadTest {
 
             // all directories should still exit before the cleanup delay time
             mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertTrue(stateDir1.exists());
             assertTrue(stateDir2.exists());
             assertTrue(stateDir3.exists());
@@ -555,7 +555,7 @@ public class StreamThreadTest {
 
             // all state directories except for task task2 & task3 will be removed. the extra directory should still exists
             mockTime.sleep(11L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertTrue(stateDir1.exists());
             assertTrue(stateDir2.exists());
             assertFalse(stateDir3.exists());
@@ -585,7 +585,7 @@ public class StreamThreadTest {
 
             // all state directories for task task1 & task2 still exist before the cleanup delay time
             mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertTrue(stateDir1.exists());
             assertTrue(stateDir2.exists());
             assertFalse(stateDir3.exists());
@@ -593,7 +593,7 @@ public class StreamThreadTest {
 
             // all state directories for task task1 & task2 are removed
             mockTime.sleep(11L);
-            thread.maybeClean();
+            thread.maybeClean(mockTime.milliseconds());
             assertFalse(stateDir1.exists());
             assertFalse(stateDir2.exists());
             assertFalse(stateDir3.exists());
@@ -615,7 +615,7 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            MockTime mockTime = new MockTime();
+            final MockTime mockTime = new MockTime();
 
             TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
             builder.addSource("source1", "topic1");
@@ -625,8 +625,8 @@ public class StreamThreadTest {
                                                    0) {
 
                 @Override
-                public void maybeCommit() {
-                    super.maybeCommit();
+                public void maybeCommit(long now) {
+                    super.maybeCommit(now);
                 }
 
                 @Override
@@ -657,14 +657,14 @@ public class StreamThreadTest {
 
             // no task is committed before the commit interval
             mockTime.sleep(commitInterval - 10L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertFalse(((TestStreamTask) task).committed);
             }
 
             // all tasks are committed after the commit interval
             mockTime.sleep(11L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertTrue(((TestStreamTask) task).committed);
                 ((TestStreamTask) task).committed = false;
@@ -672,14 +672,14 @@ public class StreamThreadTest {
 
             // no task is committed before the commit interval, again
             mockTime.sleep(commitInterval - 10L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertFalse(((TestStreamTask) task).committed);
             }
 
             // all tasks are committed after the commit interval, again
             mockTime.sleep(11L);
-            thread.maybeCommit();
+            thread.maybeCommit(mockTime.milliseconds());
             for (StreamTask task : thread.tasks().values()) {
                 assertTrue(((TestStreamTask) task).committed);
                 ((TestStreamTask) task).committed = false;