You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/16 20:46:35 UTC

kafka git commit: KAFKA-4359: Remove commit interval in integration tests for testing caching effects

Repository: kafka
Updated Branches:
  refs/heads/trunk 39a1c42df -> 04a13e82a


KAFKA-4359: Remove commit interval in integration tests for testing caching effects

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #2124 from enothereska/KAFKA-4359-intergration-tests-commit1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04a13e82
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04a13e82
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04a13e82

Branch: refs/heads/trunk
Commit: 04a13e82a6cb6346589262faf65e8b484a1948b6
Parents: 39a1c42
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Nov 16 12:46:32 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 16 12:46:32 2016 -0800

----------------------------------------------------------------------
 .../KStreamKTableJoinIntegrationTest.java       | 27 ++++++++++++--------
 .../integration/KStreamRepartitionJoinTest.java |  1 -
 .../KTableKTableJoinIntegrationTest.java        |  5 ----
 .../QueryableStateIntegrationTest.java          |  1 -
 .../integration/ResetIntegrationTest.java       |  1 -
 5 files changed, 16 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 02beee3..a939ef2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -92,7 +92,6 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
             TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
     }
@@ -169,16 +168,22 @@ public class KStreamKTableJoinIntegrationTest {
             new KeyValue<>("fang", "asia")
         );
 
-        final List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
-            new KeyValue<>("europe", 13L),
-            new KeyValue<>("americas", 4L),
-            new KeyValue<>("asia", 25L),
-            new KeyValue<>("americas", 23L),
-            new KeyValue<>("europe", 69L),
-            new KeyValue<>("americas", 101L),
-            new KeyValue<>("europe", 109L),
-            new KeyValue<>("asia", 124L)
-        );
+        final List<KeyValue<String, Long>> expectedClicksPerRegion = (cacheSizeBytes == 0) ?
+            Arrays.asList(
+                new KeyValue<>("europe", 13L),
+                new KeyValue<>("americas", 4L),
+                new KeyValue<>("asia", 25L),
+                new KeyValue<>("americas", 23L),
+                new KeyValue<>("europe", 69L),
+                new KeyValue<>("americas", 101L),
+                new KeyValue<>("europe", 109L),
+                new KeyValue<>("asia", 124L)
+            ) :
+            Arrays.asList(
+                new KeyValue<>("americas", 101L),
+                new KeyValue<>("europe", 109L),
+                new KeyValue<>("asia", 124L)
+            );
 
         //
         // Step 1: Configure and start the processor topology.

http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index d9a9768..fe86874 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -103,7 +103,6 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
         streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);

http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 5a0d368..eeb9177 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -124,10 +124,6 @@ public class KTableKTableJoinIntegrationTest {
         };
     }
 
-    public static Object[] data() {
-        return new Object[]{0, 10 * 1024 * 1024L};
-    }
-
     @BeforeClass
     public static void beforeTest() throws Exception {
         CLUSTER.createTopic(TABLE_1);
@@ -142,7 +138,6 @@ public class KTableKTableJoinIntegrationTest {
         streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 
         final Properties producerConfig = new Properties();

http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 66b6d2e..d89c33a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -142,7 +142,6 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration
             .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 4bc74ae..62238e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -171,7 +171,6 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);