You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/21 10:13:49 UTC
kafka git commit: KAFKA-4594;
Annotate integration tests and provide gradle build targets to run
subsets of tests
Repository: kafka
Updated Branches:
refs/heads/trunk 05690f0c8 -> fef7fca2a
KAFKA-4594; Annotate integration tests and provide gradle build targets to run subsets of tests
This uses JUnit Categories to identify integration tests. Adds 2 new build targets:
`integrationTest` and `unitTest`.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska <en...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2695 from dguy/junit-categories
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fef7fca2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fef7fca2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fef7fca2
Branch: refs/heads/trunk
Commit: fef7fca2af7fd00bc6b0889062da3f8b56b2224e
Parents: 05690f0
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 21 09:55:46 2017 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Mar 21 10:12:36 2017 +0000
----------------------------------------------------------------------
README.md | 32 ++--
build.gradle | 49 ++++-
.../org/apache/kafka/test/IntegrationTest.java | 20 ++
.../kafka/api/IntegrationTestHarness.scala | 2 +-
.../integration/KafkaServerTestHarness.scala | 2 +-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 8 +-
jenkins.sh | 20 ++
.../apache/kafka/streams/KafkaStreamsTest.java | 3 +
.../integration/FanoutIntegrationTest.java | 18 +-
.../GlobalKTableIntegrationTest.java | 3 +
.../InternalTopicIntegrationTest.java | 3 +
.../integration/JoinIntegrationTest.java | 3 +
.../KStreamAggregationDedupIntegrationTest.java | 3 +
.../KStreamAggregationIntegrationTest.java | 3 +
.../KStreamKTableJoinIntegrationTest.java | 33 ++--
.../integration/KStreamRepartitionJoinTest.java | 29 ++-
...eamsFineGrainedAutoResetIntegrationTest.java | 3 +
.../KTableKTableJoinIntegrationTest.java | 184 ++++++++++---------
.../QueryableStateIntegrationTest.java | 43 ++---
.../integration/RegexSourceIntegrationTest.java | 3 +
.../integration/ResetIntegrationTest.java | 3 +
21 files changed, 289 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 9c2413b..64d5f3c 100644
--- a/README.md
+++ b/README.md
@@ -14,38 +14,42 @@ Java 7 should be used for building in order to support both Java 7 and Java 8 at
Now everything else will work.
-### Building a jar and running it ###
+### Build a jar and run it ###
./gradlew jar
Follow instructions in http://kafka.apache.org/documentation.html#quickstart
-### Building source jar ###
+### Build source jar ###
./gradlew srcJar
-### Building aggregated javadoc ###
+### Build aggregated javadoc ###
./gradlew aggregatedJavadoc
-### Building javadoc and scaladoc ###
+### Build javadoc and scaladoc ###
./gradlew javadoc
./gradlew javadocJar # builds a javadoc jar for each module
./gradlew scaladoc
./gradlew scaladocJar # builds a scaladoc jar for each module
./gradlew docsJar # builds both (if applicable) javadoc and scaladoc jars for each module
-### Running unit tests ###
- ./gradlew test
-
-### Forcing re-running unit tests w/o code change ###
+### Run unit/integration tests ###
+ ./gradlew test # runs both unit and integration tests
+ ./gradlew unitTest
+ ./gradlew integrationTest
+
+### Force re-running tests without code change ###
./gradlew cleanTest test
+ ./gradlew cleanTest unitTest
+ ./gradlew cleanTest integrationTest
-### Running a particular unit test ###
+### Running a particular unit/integration test ###
./gradlew -Dtest.single=RequestResponseSerializationTest core:test
-### Running a particular test method within a unit test ###
+### Running a particular test method within a unit/integration test ###
./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testMetadataUpdateWaitTime
-### Running a particular unit test with log4j output ###
+### Running a particular unit/integration test with log4j output ###
Change the log4j setting in either `clients/src/test/resources/log4j.properties` or `core/src/test/resources/log4j.properties`
./gradlew -i -Dtest.single=RequestResponseSerializationTest core:test
@@ -103,7 +107,7 @@ to avoid known issues with this configuration.
### Building the jar for all scala versions and for all projects ###
./gradlew jarAll
-### Running unit tests for all scala versions and for all projects ###
+### Running unit/integration tests for all scala versions and for all projects ###
./gradlew testAll
### Building a binary release gzipped tar ball for all scala versions ###
@@ -136,7 +140,7 @@ Please note for this to work you should create/update `${GRADLE_USER_HOME}/gradl
### Running code quality checks ###
There are two code quality analysis tools that we regularly run, findbugs and checkstyle.
-#### Checkstyle
+#### Checkstyle ####
Checkstyle enforces a consistent coding style in Kafka.
You can run checkstyle using:
@@ -145,7 +149,7 @@ You can run checkstyle using:
The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` and `reports/checkstyle/reports/test.html` files in the
subproject build directories. They are also are printed to the console. The build will fail if Checkstyle fails.
-#### Findbugs
+#### Findbugs ####
Findbugs uses static analysis to look for bugs in the code.
You can run findbugs using:
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 57beebe..c8f4088 100644
--- a/build.gradle
+++ b/build.gradle
@@ -168,6 +168,10 @@ subprojects {
}
}
+ def testLoggingEvents = ["passed", "skipped", "failed"]
+ def testShowStandardStreams = false
+ def testExceptionFormat = 'full'
+
test {
maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
@@ -176,11 +180,45 @@ subprojects {
jvmArgs = maxPermSizeArgs
testLogging {
- events = userTestLoggingEvents ?: ["passed", "skipped", "failed"]
- showStandardStreams = userShowStandardStreams ?: false
- exceptionFormat = 'full'
+ events = userTestLoggingEvents ?: testLoggingEvents
+ showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
+ exceptionFormat = testExceptionFormat
+ }
+
+ }
+
+ task integrationTest(type: Test, dependsOn: compileJava) {
+ maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
+
+ minHeapSize = "256m"
+ maxHeapSize = "2048m"
+ jvmArgs = maxPermSizeArgs
+
+ testLogging {
+ events = userTestLoggingEvents ?: testLoggingEvents
+ showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
+ exceptionFormat = testExceptionFormat
+ }
+ useJUnit {
+ includeCategories 'org.apache.kafka.test.IntegrationTest'
}
+ }
+ task unitTest(type: Test, dependsOn: compileJava) {
+ maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
+
+ minHeapSize = "256m"
+ maxHeapSize = "2048m"
+ jvmArgs = maxPermSizeArgs
+
+ testLogging {
+ events = userTestLoggingEvents ?: testLoggingEvents
+ showStandardStreams = userShowStandardStreams ?: testShowStandardStreams
+ exceptionFormat = testExceptionFormat
+ }
+ useJUnit {
+ excludeCategories 'org.apache.kafka.test.IntegrationTest'
+ }
}
jar {
@@ -892,6 +930,7 @@ project(':connect:api') {
testCompile libs.junit
testRuntime libs.slf4jlog4j
+ testCompile project(':clients').sourceSets.test.output
}
javadoc {
@@ -929,6 +968,7 @@ project(':connect:transforms') {
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j
+ testCompile project(':clients').sourceSets.test.output
}
javadoc {
@@ -966,6 +1006,7 @@ project(':connect:json') {
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j
+ testCompile project(':clients').sourceSets.test.output
}
javadoc {
@@ -1016,6 +1057,7 @@ project(':connect:runtime') {
testCompile libs.powermockEasymock
testCompile project(":connect:json")
+ testCompile project(':clients').sourceSets.test.output
testRuntime libs.slf4jlog4j
}
@@ -1068,6 +1110,7 @@ project(':connect:file') {
testCompile libs.powermockEasymock
testRuntime libs.slf4jlog4j
+ testCompile project(':clients').sourceSets.test.output
}
javadoc {
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java b/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java
new file mode 100644
index 0000000..c73a681
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/IntegrationTest.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+public interface IntegrationTest {
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5c8ceea..ef113fb 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -33,7 +33,7 @@ import scala.collection.mutable.Buffer
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
*/
-trait IntegrationTestHarness extends KafkaServerTestHarness {
+abstract class IntegrationTestHarness extends KafkaServerTestHarness {
val producerCount: Int
val consumerCount: Int
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 9f40ec6..af3133a 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.network.ListenerName
/**
* A test harness that brings up some number of broker nodes
*/
-trait KafkaServerTestHarness extends ZooKeeperTestHarness {
+abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
var instanceConfigs: Seq[KafkaConfig] = null
var servers: Buffer[KafkaServer] = null
var brokerList: String = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 519b6fa..5d58036 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -18,12 +18,16 @@
package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{ZkUtils, Logging, CoreUtils}
+
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
import org.junit.{After, Before}
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.test.IntegrationTest
+import org.junit.experimental.categories.Category
-trait ZooKeeperTestHarness extends JUnitSuite with Logging {
+@Category(Array(classOf[IntegrationTest]))
+abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
val zkConnectionTimeout = 10000
val zkSessionTimeout = 6000
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/jenkins.sh
----------------------------------------------------------------------
diff --git a/jenkins.sh b/jenkins.sh
new file mode 100755
index 0000000..0eec2e5
--- /dev/null
+++ b/jenkins.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
+# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
+# the modules are executed before the integration tests.
+./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest unitTest integrationTest --no-daemon -Dorg.gradle.project.testLoggingEvents=started,passed,skipped,failed "$@"
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index d4bf471..eebbde9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -28,12 +28,14 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.HashMap;
@@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@Category({IntegrationTest.class})
public class KafkaStreamsTest {
private static final int NUM_BROKERS = 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 421efc7..25dea92 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -31,13 +31,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.IntegrationTest;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
@@ -67,7 +65,7 @@ import static org.junit.Assert.assertThat;
* }
* </pre>
*/
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
public class FanoutIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@@ -86,15 +84,6 @@ public class FanoutIntegrationTest {
CLUSTER.createTopic(OUTPUT_TOPIC_C);
}
- @Parameter
- public long cacheSizeBytes;
-
- //Single parameter, use Object[]
- @Parameters
- public static Object[] data() {
- return new Object[] {0, 10 * 1024 * 1024L};
- }
-
@Test
public void shouldFanoutTheInput() throws Exception {
final List<String> inputValues = Arrays.asList("Hello", "World");
@@ -116,7 +105,6 @@ public class FanoutIntegrationTest {
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
final KStream<byte[], String> stream2 = stream1.mapValues(
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 4d9b365..676b464 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -35,12 +35,14 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@@ -48,6 +50,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 3a9d843..4b558f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -38,12 +38,14 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
@@ -60,6 +62,7 @@ import static org.junit.Assert.assertTrue;
/**
* Tests related to internal topics in streams
*/
+@Category({IntegrationTest.class})
public class InternalTopicIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 6f77716..5263456 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -43,6 +44,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.Arrays;
import java.util.Collections;
@@ -58,6 +60,7 @@ import static org.hamcrest.core.Is.is;
/**
* Tests all available joins of Kafka Streams DSL.
*/
+@Category({IntegrationTest.class})
public class JoinIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 7b14cf8..415f593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -51,6 +52,7 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.utils.MockTime;
+import org.junit.experimental.categories.Category;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -59,6 +61,7 @@ import static org.hamcrest.core.Is.is;
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled
* by virtue of having a large commit interval
*/
+@Category({IntegrationTest.class})
public class KStreamAggregationDedupIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4eb582c..f42ad56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -45,12 +45,14 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@@ -69,6 +71,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
+@Category({IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index e60530d..d566041 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -37,15 +37,13 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@@ -59,8 +57,7 @@ import static org.junit.Assert.assertThat;
* End-to-end integration test that demonstrates how to perform a join between a KStream and a
* KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
*/
-
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
public class KStreamKTableJoinIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@@ -95,7 +92,7 @@ public class KStreamKTableJoinIntegrationTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+
}
@@ -107,16 +104,6 @@ public class KStreamKTableJoinIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
- @Parameter
- public long cacheSizeBytes;
-
- //Single parameter, use Object[]
- @Parameters
- public static Object[] data() {
- return new Object[] {0, 10 * 1024 * 1024L};
-
- }
-
/**
* Tuple for a region and its associated number of clicks.
*/
@@ -147,7 +134,17 @@ public class KStreamKTableJoinIntegrationTest {
}
@Test
- public void shouldCountClicksPerRegion() throws Exception {
+ public void shouldCountClicksPerRegionWithZeroByteCache() throws Exception {
+ countClicksPerRegion(0);
+ }
+
+ @Test
+ public void shouldCountClicksPerRegionWithNonZeroByteCache() throws Exception {
+ countClicksPerRegion(10 * 1024 * 1024);
+ }
+
+ private void countClicksPerRegion(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
// Input 1: Clicks per user (multiple records allowed per user).
final List<KeyValue<String, Long>> userClicks = Arrays.asList(
new KeyValue<>("alice", 13L),
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 5d44255..7da1ffd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
@@ -44,10 +45,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@@ -60,7 +58,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
public class KStreamRepartitionJoinTest {
private static final int NUM_BROKERS = 1;
@@ -87,15 +85,6 @@ public class KStreamRepartitionJoinTest {
private String streamFourInput;
private static volatile int testNo = 0;
- @Parameter
- public long cacheSizeBytes;
-
- //Single parameter, use Object[]
- @Parameters
- public static Object[] data() {
- return new Object[] {0, 10 * 1024 * 1024L};
- }
-
@Before
public void before() throws InterruptedException {
testNo++;
@@ -109,7 +98,6 @@ public class KStreamRepartitionJoinTest {
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
@@ -127,8 +115,17 @@ public class KStreamRepartitionJoinTest {
}
@Test
- public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
+ public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() throws Exception {
+ verifyRepartitionOnJoinOperations(0);
+ }
+ @Test
+ public void shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws Exception {
+ verifyRepartitionOnJoinOperations(10 * 1024 * 1024);
+ }
+
+ private void verifyRepartitionOnJoinOperations(final int cacheSizeBytes) throws Exception {
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
produceMessages();
final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index c12b975..3028b6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -37,6 +38,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,6 +51,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+@Category({IntegrationTest.class})
public class KStreamsFineGrainedAutoResetIntegrationTest {
private static final int NUM_BROKERS = 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index baeaf6f..ec40c17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -30,23 +30,24 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.experimental.categories.Category;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
public class KTableKTableJoinIntegrationTest {
private final static int NUM_BROKERS = 1;
@@ -61,71 +62,6 @@ public class KTableKTableJoinIntegrationTest {
private KafkaStreams streams;
private final static Properties CONSUMER_CONFIG = new Properties();
- @Parameterized.Parameter(value = 0)
- public JoinType joinType1;
- @Parameterized.Parameter(value = 1)
- public JoinType joinType2;
- @Parameterized.Parameter(value = 2)
- public List<KeyValue<String, String>> expectedResult;
-
- //Single parameter, use Object[]
- @Parameterized.Parameters
- public static Object[] parameters() {
- return new Object[][]{
- {JoinType.INNER, JoinType.INNER, Arrays.asList(
- new KeyValue<>("b", "B1-B2-B3")//,
- )},
- {JoinType.INNER, JoinType.LEFT, Arrays.asList(
- new KeyValue<>("b", "B1-B2-B3")//,
- )},
- {JoinType.INNER, JoinType.OUTER, Arrays.asList(
- new KeyValue<>("a", "null-A3"),
- new KeyValue<>("b", "null-B3"),
- new KeyValue<>("c", "null-C3"),
- new KeyValue<>("b", "B1-B2-B3")//,
- )},
- {JoinType.LEFT, JoinType.INNER, Arrays.asList(
- new KeyValue<>("a", "A1-null-A3"),
- new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3")//,
- )},
- {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
- new KeyValue<>("a", "A1-null-A3"),
- new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3")//,
- )},
- {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
- new KeyValue<>("a", "null-A3"),
- new KeyValue<>("b", "null-B3"),
- new KeyValue<>("c", "null-C3"),
- new KeyValue<>("a", "A1-null-A3"),
- new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3")//,
- )},
- {JoinType.OUTER, JoinType.INNER, Arrays.asList(
- new KeyValue<>("a", "A1-null-A3"),
- new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C2-C3")
- )},
- {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
- new KeyValue<>("a", "A1-null-A3"),
- new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C2-C3")
- )},
- {JoinType.OUTER, JoinType.OUTER, Arrays.asList(
- new KeyValue<>("a", "null-A3"),
- new KeyValue<>("b", "null-B3"),
- new KeyValue<>("c", "null-C3"),
- new KeyValue<>("a", "A1-null-A3"),
- new KeyValue<>("b", "B1-null-B3"),
- new KeyValue<>("b", "B1-B2-B3"),
- new KeyValue<>("c", "null-C2-C3")
- )}
- };
- }
-
@BeforeClass
public static void beforeTest() throws Exception {
CLUSTER.createTopic(TABLE_1);
@@ -194,7 +130,101 @@ public class KTableKTableJoinIntegrationTest {
INNER, LEFT, OUTER
}
- private KafkaStreams prepareTopology() {
+
+ @Test
+ public void shouldInnerInnerJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+ }
+
+ @Test
+ public void shouldInnerLeftJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")));
+ }
+
+ @Test
+ public void shouldInnerOuterJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("b", "B1-B2-B3")));
+ }
+
+ @Test
+ public void shouldLeftInnerJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")));
+ }
+
+ @Test
+ public void shouldLeftLeftJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")));
+ }
+
+ @Test
+ public void shouldLeftOuterJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3")));
+ }
+
+ @Test
+ public void shouldOuterInnerJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")));
+ }
+
+ @Test
+ public void shouldOuterLeftJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")));
+ }
+
+ @Test
+ public void shouldOuterOuterJoin() throws Exception {
+ verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")));
+ }
+
+
+ private void verifyKTableKTableJoin(final JoinType joinType1,
+ final JoinType joinType2,
+ final List<KeyValue<String, String>> expectedResult) throws Exception {
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
+
+ streams = prepareTopology(joinType1, joinType2);
+ streams.start();
+
+
+ final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ CONSUMER_CONFIG,
+ OUTPUT,
+ expectedResult.size());
+
+ assertThat(result, equalTo(expectedResult));
+ }
+ private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2) {
final KStreamBuilder builder = new KStreamBuilder();
final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
@@ -226,20 +256,4 @@ public class KTableKTableJoinIntegrationTest {
throw new RuntimeException("Unknown join type.");
}
- @Test
- public void KTableKTableJoin() throws Exception {
- streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
-
- streams = prepareTopology();
- streams.start();
-
-
- final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- CONSUMER_CONFIG,
- OUTPUT,
- expectedResult.size());
-
- assertThat(result, equalTo(expectedResult));
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index a2b56d3..012462a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -55,10 +56,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
@@ -77,7 +75,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
-@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@@ -85,7 +83,7 @@ public class QueryableStateIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER =
new EmbeddedKafkaCluster(NUM_BROKERS);
- public static final int STREAM_THREE_PARTITIONS = 4;
+ private static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
private String streamTwo = "stream-two";
@@ -106,7 +104,7 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, Long>> stringLongComparator;
private static int testNo = 0;
- public void createTopics() throws InterruptedException {
+ private void createTopics() throws InterruptedException {
streamOne = streamOne + "-" + testNo;
streamConcurrent = streamConcurrent + "-" + testNo;
streamThree = streamThree + "-" + testNo;
@@ -123,15 +121,6 @@ public class QueryableStateIntegrationTest {
CLUSTER.createTopic(outputTopicThree);
}
- @Parameter
- public long cacheSizeBytes;
-
- //Single parameter, use Object[]
- @Parameters
- public static Object[] data() {
- return new Object[]{0, 10 * 1024 * 1024L};
- }
-
@Before
public void before() throws IOException, InterruptedException {
testNo++;
@@ -147,8 +136,7 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
stringComparator = new Comparator<KeyValue<String, String>>() {
@@ -426,7 +414,17 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void shouldBeAbleToQueryState() throws Exception {
+ public void shouldBeAbleToQueryStateWithZeroSizedCache() throws Exception {
+ verifyCanQueryState(0);
+ }
+
+ @Test
+ public void shouldBeAbleToQueryStateWithNonZeroSizedCache() throws Exception {
+ verifyCanQueryState(10 * 1024 * 1024);
+ }
+
+ private void verifyCanQueryState(int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final KStreamBuilder builder = new KStreamBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
@@ -446,13 +444,13 @@ public class QueryableStateIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
- batch1,
- TestUtils.producerConfig(
+ batch1,
+ TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
- mockTime);
+ mockTime);
final KStream<String, String> s1 = builder.stream(streamOne);
@@ -477,7 +475,6 @@ public class QueryableStateIntegrationTest {
myCount);
verifyRangeAndAll(expectedCount, myCount);
-
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e115861..a84a208 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -46,6 +47,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -65,6 +67,7 @@ import static org.junit.Assert.fail;
* End-to-end integration test based on using regex and named topics for creating sources, using
* an embedded Kafka cluster.
*/
+@Category({IntegrationTest.class})
public class RegexSourceIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
http://git-wip-us.apache.org/repos/asf/kafka/blob/fef7fca2/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 88a8545..ce29f32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
@@ -47,6 +48,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.HashSet;
@@ -60,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
/**
* Tests local state store and global application cleanup.
*/
+@Category({IntegrationTest.class})
public class ResetIntegrationTest {
private static final int NUM_BROKERS = 1;