You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/16 20:46:35 UTC
kafka git commit: KAFKA-4359: Remove commit interval in integration
tests for testing caching effects
Repository: kafka
Updated Branches:
refs/heads/trunk 39a1c42df -> 04a13e82a
KAFKA-4359: Remove commit interval in integration tests for testing caching effects
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #2124 from enothereska/KAFKA-4359-intergration-tests-commit1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/04a13e82
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/04a13e82
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/04a13e82
Branch: refs/heads/trunk
Commit: 04a13e82a6cb6346589262faf65e8b484a1948b6
Parents: 39a1c42
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Nov 16 12:46:32 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 16 12:46:32 2016 -0800
----------------------------------------------------------------------
.../KStreamKTableJoinIntegrationTest.java | 27 ++++++++++++--------
.../integration/KStreamRepartitionJoinTest.java | 1 -
.../KTableKTableJoinIntegrationTest.java | 5 ----
.../QueryableStateIntegrationTest.java | 1 -
.../integration/ResetIntegrationTest.java | 1 -
5 files changed, 16 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 02beee3..a939ef2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -92,7 +92,6 @@ public class KStreamKTableJoinIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
}
@@ -169,16 +168,22 @@ public class KStreamKTableJoinIntegrationTest {
new KeyValue<>("fang", "asia")
);
- final List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
- new KeyValue<>("europe", 13L),
- new KeyValue<>("americas", 4L),
- new KeyValue<>("asia", 25L),
- new KeyValue<>("americas", 23L),
- new KeyValue<>("europe", 69L),
- new KeyValue<>("americas", 101L),
- new KeyValue<>("europe", 109L),
- new KeyValue<>("asia", 124L)
- );
+ final List<KeyValue<String, Long>> expectedClicksPerRegion = (cacheSizeBytes == 0) ?
+ Arrays.asList(
+ new KeyValue<>("europe", 13L),
+ new KeyValue<>("americas", 4L),
+ new KeyValue<>("asia", 25L),
+ new KeyValue<>("americas", 23L),
+ new KeyValue<>("europe", 69L),
+ new KeyValue<>("americas", 101L),
+ new KeyValue<>("europe", 109L),
+ new KeyValue<>("asia", 124L)
+ ) :
+ Arrays.asList(
+ new KeyValue<>("americas", 101L),
+ new KeyValue<>("europe", 109L),
+ new KeyValue<>("asia", 124L)
+ );
//
// Step 1: Configure and start the processor topology.
http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index d9a9768..fe86874 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -103,7 +103,6 @@ public class KStreamRepartitionJoinTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 5a0d368..eeb9177 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -124,10 +124,6 @@ public class KTableKTableJoinIntegrationTest {
};
}
- public static Object[] data() {
- return new Object[]{0, 10 * 1024 * 1024L};
- }
-
@BeforeClass
public static void beforeTest() throws Exception {
CLUSTER.createTopic(TABLE_1);
@@ -142,7 +138,6 @@ public class KTableKTableJoinIntegrationTest {
streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
- streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final Properties producerConfig = new Properties();
http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 66b6d2e..d89c33a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -142,7 +142,6 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration
.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
http://git-wip-us.apache.org/repos/asf/kafka/blob/04a13e82/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 4bc74ae..62238e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -171,7 +171,6 @@ public class ResetIntegrationTest {
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);