You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/31 22:29:05 UTC
[2/8] kafka git commit: KAFKA-5671: Add StreamsBuilder and Deprecate
KStreamBuilder
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index d47b1e6..b099241 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -18,22 +18,22 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.List;
-import java.util.Locale;
-import java.util.ArrayList;
-import java.util.Arrays;
+
import java.io.File;
import java.io.IOException;
-
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
import static org.junit.Assert.assertEquals;
@@ -86,7 +86,7 @@ public class KTableForeachTest {
};
// When
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName");
table.foreach(action);
@@ -112,7 +112,7 @@ public class KTableForeachTest {
public void apply(Number key, Object value) {}
};
- new KStreamBuilder()
+ new StreamsBuilder()
.<Integer, String>table("emptyTopic", "emptyStore")
.foreach(consume);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 1fbaebf..63ea763 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -19,16 +19,14 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
@@ -41,9 +39,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -56,7 +54,7 @@ public class KTableImplTest {
private KStreamTestDriver driver = null;
private File stateDir = null;
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private KTable<String, String> table;
@After
@@ -70,13 +68,13 @@ public class KTableImplTest {
@Before
public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
- builder = new KStreamBuilder();
+ builder = new StreamsBuilder();
table = builder.table("test", "test");
}
@Test
public void testKTable() {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
@@ -133,7 +131,7 @@ public class KTableImplTest {
@Test
public void testValueGetter() throws IOException {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
@@ -266,12 +264,11 @@ public class KTableImplTest {
String storeName1 = "storeName1";
String storeName2 = "storeName2";
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
- KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
+ builder.table(stringSerde, stringSerde, topic2, storeName2);
KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@@ -280,7 +277,7 @@ public class KTableImplTest {
return new Integer(value);
}
});
- KTableImpl<String, Integer, Integer> table1MappedFiltered = (KTableImpl<String, Integer, Integer>) table1Mapped.filter(
+ table1Mapped.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
@@ -302,7 +299,7 @@ public class KTableImplTest {
String storeName1 = "storeName1";
String storeName2 = "storeName2";
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
@@ -343,19 +340,17 @@ public class KTableImplTest {
String topic1 = "topic1";
String storeName1 = "storeName1";
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
- KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
- .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
- .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
+ table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");
- KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
- .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
- .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
+ table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+ .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
driver.setTime(0L);
@@ -405,16 +400,19 @@ public class KTableImplTest {
table.mapValues(null);
}
+ @SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
table.writeAsText(null);
}
- @Test(expected = TopologyBuilderException.class)
+ @SuppressWarnings("deprecation")
+ @Test(expected = TopologyException.class)
public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
table.writeAsText("\t \t");
}
+ @SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullActionOnForEach() throws Exception {
table.foreach(null);
@@ -442,22 +440,22 @@ public class KTableImplTest {
@Test
public void shouldAllowNullStoreInJoin() throws Exception {
- table.join(table, MockValueJoiner.TOSTRING_JOINER, null, (String) null);
+ table.join(table, MockValueJoiner.TOSTRING_JOINER, null, null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullStoreSupplierInJoin() throws Exception {
- table.join(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+ table.join(table, MockValueJoiner.TOSTRING_JOINER, null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullStoreSupplierInLeftJoin() throws Exception {
- table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+ table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullStoreSupplierInOuterJoin() throws Exception {
- table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier<KeyValueStore>) null);
+ table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, null);
}
@Test(expected = NullPointerException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index fdc07f3..668e7f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -31,6 +31,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
@@ -67,11 +68,24 @@ public class KTableKTableJoinTest {
stateDir = TestUtils.tempDirectory("kafka-test");
}
- private void doTestJoin(final KStreamBuilder builder,
+ public static Collection<Set<String>> getCopartitionedGroups(StreamsBuilder builder) {
+ // TODO: we should refactor this to avoid usage of reflection
+ try {
+ final Field internalStreamsBuilderField = builder.getClass().getDeclaredField("internalStreamsBuilder");
+ internalStreamsBuilderField.setAccessible(true);
+ final InternalStreamsBuilder internalStreamsBuilder = (InternalStreamsBuilder) internalStreamsBuilderField.get(builder);
+
+ return internalStreamsBuilder.internalTopologyBuilder.copartitionGroups();
+ } catch (final NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void doTestJoin(final StreamsBuilder builder,
final int[] expectedKeys,
final MockProcessorSupplier<Integer, String> processor,
final KTable<Integer, String> joined) {
- final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ final Collection<Set<String>> copartitionGroups = getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -163,7 +177,7 @@ public class KTableKTableJoinTest {
@Test
public void testJoin() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -184,7 +198,7 @@ public class KTableKTableJoinTest {
@Test
public void testQueryableJoin() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -202,7 +216,7 @@ public class KTableKTableJoinTest {
doTestJoin(builder, expectedKeys, processor, joined);
}
- private void doTestSendingOldValues(final KStreamBuilder builder,
+ private void doTestSendingOldValues(final StreamsBuilder builder,
final int[] expectedKeys,
final KTable<Integer, String> table1,
final KTable<Integer, String> table2,
@@ -285,7 +299,7 @@ public class KTableKTableJoinTest {
@Test
public void testNotSendingOldValues() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -298,7 +312,7 @@ public class KTableKTableJoinTest {
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
@@ -306,7 +320,7 @@ public class KTableKTableJoinTest {
@Test
public void testQueryableNotSendingOldValues() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -319,7 +333,7 @@ public class KTableKTableJoinTest {
table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false);
@@ -327,7 +341,7 @@ public class KTableKTableJoinTest {
@Test
public void testSendingOldValues() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -341,7 +355,7 @@ public class KTableKTableJoinTest {
joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index dce19e3..7913902 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -74,7 +74,7 @@ public class KTableKTableLeftJoinTest {
@Test
public void testJoin() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -85,7 +85,7 @@ public class KTableKTableLeftJoinTest {
processor = new MockProcessorSupplier<>();
joined.toStream().process(processor);
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -168,7 +168,7 @@ public class KTableKTableLeftJoinTest {
@Test
public void testNotSendingOldValue() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -182,7 +182,7 @@ public class KTableKTableLeftJoinTest {
joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
driver = new KStreamTestDriver(builder, stateDir);
driver.setTime(0L);
@@ -249,7 +249,7 @@ public class KTableKTableLeftJoinTest {
@Test
public void testSendingOldValue() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -265,7 +265,7 @@ public class KTableKTableLeftJoinTest {
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
driver = new KStreamTestDriver(builder, stateDir);
driver.setTime(0L);
@@ -346,7 +346,7 @@ public class KTableKTableLeftJoinTest {
final String tableSix = "tableSix";
final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix};
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg)
.groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index a60eae3..1ea86dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -69,7 +69,7 @@ public class KTableKTableOuterJoinTest {
@Test
public void testJoin() throws Exception {
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -84,7 +84,7 @@ public class KTableKTableOuterJoinTest {
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
joined.toStream().process(processor);
- Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+ Collection<Set<String>> copartitionGroups = KTableKTableJoinTest.getCopartitionedGroups(builder);
assertEquals(1, copartitionGroups.size());
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -174,7 +174,7 @@ public class KTableKTableOuterJoinTest {
@Test
public void testNotSendingOldValue() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -188,7 +188,7 @@ public class KTableKTableOuterJoinTest {
joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
driver = new KStreamTestDriver(builder, stateDir);
@@ -262,7 +262,7 @@ public class KTableKTableOuterJoinTest {
@Test
public void testSendingOldValue() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -278,7 +278,7 @@ public class KTableKTableOuterJoinTest {
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
+ builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
driver = new KStreamTestDriver(builder, stateDir);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 5ff01e6..9c5955a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -19,14 +19,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +60,7 @@ public class KTableMapKeysTest {
@Test
public void testMapKeysConvertingToStream() {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic_map_keys";
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 70b1a55..6bdf83d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -58,7 +58,7 @@ public class KTableMapValuesTest {
stateDir = TestUtils.tempDirectory("kafka-test");
}
- private void doTestKTable(final KStreamBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
+ private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());
driver.process(topic1, "A", "1");
@@ -71,7 +71,7 @@ public class KTableMapValuesTest {
@Test
public void testKTable() {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -91,7 +91,7 @@ public class KTableMapValuesTest {
@Test
public void testQueryableKTable() {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -109,7 +109,7 @@ public class KTableMapValuesTest {
doTestKTable(builder, topic1, proc2);
}
- private void doTestValueGetter(final KStreamBuilder builder,
+ private void doTestValueGetter(final StreamsBuilder builder,
final String topic1,
final KTableImpl<String, String, String> table1,
final KTableImpl<String, String, Integer> table2,
@@ -212,7 +212,7 @@ public class KTableMapValuesTest {
@Test
public void testValueGetter() throws IOException {
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
@@ -243,7 +243,7 @@ public class KTableMapValuesTest {
@Test
public void testQueryableValueGetter() throws IOException {
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
@@ -274,7 +274,7 @@ public class KTableMapValuesTest {
@Test
public void testNotSendingOldValue() throws IOException {
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -290,7 +290,7 @@ public class KTableMapValuesTest {
MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, table2.name);
+ builder.build().addProcessor("proc", proc, table2.name);
driver = new KStreamTestDriver(builder, stateDir, null, null);
assertFalse(table1.sendingOldValueEnabled());
@@ -322,7 +322,7 @@ public class KTableMapValuesTest {
@Test
public void testSendingOldValue() throws IOException {
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -340,7 +340,7 @@ public class KTableMapValuesTest {
MockProcessorSupplier<String, Integer> proc = new MockProcessorSupplier<>();
- builder.addProcessor("proc", proc, table2.name);
+ builder.build().addProcessor("proc", proc, table2.name);
driver = new KStreamTestDriver(builder, stateDir, null, null);
assertTrue(table1.sendingOldValueEnabled());
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 20a6e9a..77d2b19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
@@ -58,7 +58,7 @@ public class KTableSourceTest {
@Test
public void testKTable() {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -82,7 +82,7 @@ public class KTableSourceTest {
@Test
public void testValueGetter() throws IOException {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -126,7 +126,7 @@ public class KTableSourceTest {
@Test
public void testNotSendingOldValue() throws IOException {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -134,7 +134,7 @@ public class KTableSourceTest {
MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- builder.addProcessor("proc1", proc1, table1.name);
+ builder.build().addProcessor("proc1", proc1, table1.name);
driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.process(topic1, "A", "01");
@@ -164,7 +164,7 @@ public class KTableSourceTest {
@Test
public void testSendingOldValue() throws IOException {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -176,7 +176,7 @@ public class KTableSourceTest {
MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
- builder.addProcessor("proc1", proc1, table1.name);
+ builder.build().addProcessor("proc1", proc1, table1.name);
driver = new KStreamTestDriver(builder, stateDir, null, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index ec3ea0e..7453cc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -32,14 +32,14 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -51,9 +51,9 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
-import java.util.concurrent.CountDownLatch;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -280,13 +280,13 @@ public class SimpleBenchmark {
private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, byte[]> input = builder.stream(topic);
input.groupByKey()
.count("tmpStoreName").foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return new KafkaStreams(builder.build(), streamConfig);
}
@@ -584,7 +584,7 @@ public class SimpleBenchmark {
private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
Properties props = setStreamProperties("simple-benchmark-streams");
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
@@ -623,7 +623,7 @@ public class SimpleBenchmark {
private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
final Properties props = setStreamProperties("simple-benchmark-streams-with-sink");
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
@@ -682,7 +682,7 @@ public class SimpleBenchmark {
private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic,
String kTableTopic, final CountDownLatch latch) {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store");
@@ -694,7 +694,7 @@ public class SimpleBenchmark {
private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
String kTableTopic2, final CountDownLatch latch) {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
@@ -706,7 +706,7 @@ public class SimpleBenchmark {
private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
String kStreamTopic2, final CountDownLatch latch) {
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic1);
final KStream<Long, byte[]> input2 = builder.stream(kStreamTopic2);
@@ -722,7 +722,7 @@ public class SimpleBenchmark {
boolean enableCaching) {
Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
if (enableCaching) {
builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build());
@@ -767,8 +767,8 @@ public class SimpleBenchmark {
return createKafkaStreamsWithExceptionHandler(builder, props);
}
- private KafkaStreams createKafkaStreamsWithExceptionHandler(final KStreamBuilder builder, final Properties props) {
- final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+ private KafkaStreams createKafkaStreamsWithExceptionHandler(final StreamsBuilder builder, final Properties props) {
+ final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 56c578f..02e532d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -26,9 +26,9 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
@@ -36,13 +36,13 @@ import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
-import java.util.List;
-import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
@@ -280,7 +280,7 @@ public class YahooBenchmark {
serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
projectedEventDeserializer.configure(serdeProps, false);
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(),
Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic);
final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
@@ -356,6 +356,6 @@ public class YahooBenchmark {
.groupByKey(Serdes.String(), Serdes.String())
.count(TimeWindows.of(10 * 1000), "time-windows");
- return new KafkaStreams(builder, streamConfig);
+ return new KafkaStreams(builder.build(), streamConfig);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 07be6e8..a947218 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -40,9 +40,9 @@ import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.St
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class GlobalStreamThreadTest {
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 9ac2f99..4022a47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -47,6 +47,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -328,9 +329,15 @@ public class StandbyTaskTest {
restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
- final KStreamBuilder builder = new KStreamBuilder();
- builder.stream("topic").groupByKey().count("my-store");
- final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
+ final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+ builder.stream(null, null, null, null, "topic").groupByKey().count("my-store");
+
+ // TODO: we should refactor this to avoid usage of reflection
+ final Field internalTopologyBuilderField = builder.getClass().getDeclaredField("internalTopologyBuilder");
+ internalTopologyBuilderField.setAccessible(true);
+ final InternalTopologyBuilder internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(builder);
+
+ final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
StreamsConfig config = createConfig(baseDir);
new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config,
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 33e43e2..7b2956b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -28,12 +28,13 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -934,11 +935,13 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
+ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() throws Exception {
final String applicationId = "application-id";
- final KStreamBuilder builder = new KStreamBuilder();
- builder.setApplicationId(applicationId);
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
+ internalTopologyBuilder.setApplicationId(applicationId);
KStream<Object, Object> stream1 = builder
@@ -997,7 +1000,7 @@ public class StreamPartitionAssignorTest {
final String client = "client1";
final StreamThread streamThread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
mockClientSupplier,
applicationId,
@@ -1005,7 +1008,7 @@ public class StreamPartitionAssignorTest {
uuid,
new Metrics(),
Time.SYSTEM,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -1077,19 +1080,22 @@ public class StreamPartitionAssignorTest {
@Test
public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+ final String applicationId = "appId";
final Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
final StreamsConfig config = new StreamsConfig(props);
- final KStreamBuilder builder = new KStreamBuilder();
- final String applicationId = "appId";
- builder.setApplicationId(applicationId);
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
+ internalTopologyBuilder.setApplicationId(applicationId);
+
builder.stream("topic1").groupByKey().count("count");
final UUID uuid = UUID.randomUUID();
final String client = "client1";
final StreamThread streamThread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
mockClientSupplier,
applicationId,
@@ -1097,7 +1103,7 @@ public class StreamPartitionAssignorTest {
uuid,
new Metrics(),
Time.SYSTEM,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 29edf6a..026d731 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -34,7 +34,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -93,7 +93,8 @@ public class StreamThreadTest {
private final Metrics metrics = new Metrics();
private final MockClientSupplier clientSupplier = new MockClientSupplier();
private UUID processId = UUID.randomUUID();
- final KStreamBuilder builder = new KStreamBuilder();
+ private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
+ private InternalTopologyBuilder internalTopologyBuilder;
private final StreamsConfig config = new StreamsConfig(configProps(false));
private final String stateDir = TestUtils.tempDirectory().getPath();
private final StateDirectory stateDirectory = new StateDirectory("applicationId", stateDir, mockTime);
@@ -102,7 +103,13 @@ public class StreamThreadTest {
@Before
public void setUp() throws Exception {
processId = UUID.randomUUID();
- builder.setApplicationId(applicationId);
+
+ // TODO: we should refactor this to avoid usage of reflection
+ final Field internalTopologyBuilderField = internalStreamsBuilder.getClass().getDeclaredField("internalTopologyBuilder");
+ internalTopologyBuilderField.setAccessible(true);
+ internalTopologyBuilder = (InternalTopologyBuilder) internalTopologyBuilderField.get(internalStreamsBuilder);
+
+ internalTopologyBuilder.setApplicationId(applicationId);
}
private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -258,7 +265,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
- builder.addSource("source1", "topic1");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
final StreamThread thread = getStreamThread();
@@ -352,10 +359,10 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
- builder.addSource("source1", "topic1");
- builder.addSource("source2", "topic2");
- builder.addSource("source3", "topic3");
- builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
+ internalTopologyBuilder.addSource(null, "source2", null, null, null, "topic2");
+ internalTopologyBuilder.addSource(null, "source3", null, null, null, "topic3");
+ internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
final StreamThread thread = getStreamThread();
@@ -448,7 +455,7 @@ public class StreamThreadTest {
public void testStateChangeStartClose() throws InterruptedException {
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -457,7 +464,7 @@ public class StreamThreadTest {
metrics,
Time.SYSTEM,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -489,7 +496,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@Test
public void testHandingOverTaskFromOneToAnotherThread() throws InterruptedException {
- builder.addStateStore(
+ internalTopologyBuilder.addStateStore(
Stores
.create("store")
.withByteArrayKeys()
@@ -497,12 +504,12 @@ public class StreamThreadTest {
.persistent()
.build()
);
- builder.addSource("source", TOPIC);
+ internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC);
//clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
final StreamThread thread1 = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -510,11 +517,11 @@ public class StreamThreadTest {
processId,
metrics,
Time.SYSTEM,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
final StreamThread thread2 = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -522,7 +529,7 @@ public class StreamThreadTest {
processId,
metrics,
Time.SYSTEM,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -627,7 +634,7 @@ public class StreamThreadTest {
@Test
public void testMetrics() {
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -635,7 +642,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0, stateDirectory);
final String defaultGroupName = "stream-metrics";
final String defaultPrefix = "thread." + thread.threadClientId();
@@ -678,10 +685,10 @@ public class StreamThreadTest {
final StreamsConfig config = new StreamsConfig(props);
- builder.addSource("source1", "topic1");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -689,7 +696,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0, stateDirectory) {
@Override
@@ -770,10 +777,10 @@ public class StreamThreadTest {
@Test
public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException {
- builder.addSource("source1", "someTopic");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -781,7 +788,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -806,11 +813,11 @@ public class StreamThreadTest {
@Test
public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException {
- builder.addSource("source1", "someTopic");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
new StreamsConfig(configProps(true)),
clientSupplier,
applicationId,
@@ -818,7 +825,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -846,11 +853,11 @@ public class StreamThreadTest {
@Test
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
- builder.addSource("source1", "someTopic");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
new StreamsConfig(configProps(true)),
clientSupplier,
applicationId,
@@ -858,7 +865,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -881,10 +888,10 @@ public class StreamThreadTest {
@Test
public void shouldCloseThreadProducerOnCloseIfEosDisabled() throws InterruptedException {
- builder.addSource("source1", "someTopic");
+ internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic");
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -892,7 +899,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -913,10 +920,10 @@ public class StreamThreadTest {
@Test
public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws InterruptedException {
- builder.addSource("name", "topic").addSink("out", "output");
+ internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
+ internalTopologyBuilder.addSink("out", "output", null, null, null);
- final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ final StreamThread thread = new StreamThread(internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -924,7 +931,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -942,13 +949,14 @@ public class StreamThreadTest {
@Test
public void shouldNotCloseSuspendedTaskswice() throws Exception {
- builder.addSource("name", "topic").addSink("out", "output");
+ internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
+ internalTopologyBuilder.addSink("out", "output", null, null, null);
final TestStreamTask testStreamTask = new TestStreamTask(
new TaskId(0, 0),
applicationId,
Utils.mkSet(new TopicPartition("topic", 0)),
- builder.build(0),
+ internalTopologyBuilder.build(0),
clientSupplier.consumer,
clientSupplier.getProducer(new HashMap<String, Object>()),
clientSupplier.restoreConsumer,
@@ -957,7 +965,7 @@ public class StreamThreadTest {
new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime));
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -965,7 +973,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
@@ -1005,11 +1013,11 @@ public class StreamThreadTest {
@Test
public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() {
- builder.stream("t1").groupByKey().count("count-one");
- builder.stream("t2").groupByKey().count("count-two");
+ internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+ internalStreamsBuilder.stream(null, null, null, null, "t2").groupByKey().count("count-two");
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1017,7 +1025,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -1063,11 +1071,11 @@ public class StreamThreadTest {
@Test
public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
- builder.stream("t1").groupByKey().count("count-one");
- builder.stream("t2").groupByKey().count("count-two");
+ internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+ internalStreamsBuilder.stream(null, null, null, null, "t2").groupByKey().count("count-two");
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1075,7 +1083,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@@ -1135,12 +1143,12 @@ public class StreamThreadTest {
@Test
public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
- builder.stream(Pattern.compile("t.*")).to("out");
+ internalStreamsBuilder.stream(null, null, null, null, Pattern.compile("t.*")).to("out");
final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>();
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1148,7 +1156,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
@@ -1190,7 +1198,7 @@ public class StreamThreadTest {
updatedTopicsField.setAccessible(true);
final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
updatedTopics.add(t1.topic());
- builder.updateSubscriptions(subscriptionUpdates, null);
+ internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, null);
// should create task for id 0_0 with a single partition
thread.setState(StreamThread.State.RUNNING);
@@ -1217,11 +1225,12 @@ public class StreamThreadTest {
@Test
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
- builder.addSource("source", TOPIC).addSink("sink", "dummyTopic", "source");
+ internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC);
+ internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
new StreamsConfig(configProps(true)),
clientSupplier,
applicationId,
@@ -1229,7 +1238,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -1314,11 +1323,12 @@ public class StreamThreadTest {
@Test
public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception {
- builder.addSource("name", "topic").addSink("out", "output");
+ internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
+ internalTopologyBuilder.addSink("out", "output", null, null, null);
final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId);
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
new StreamsConfig(configProps(true)),
clientSupplier,
applicationId,
@@ -1326,7 +1336,7 @@ public class StreamThreadTest {
processId,
new Metrics(),
new MockTime(),
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -1352,13 +1362,13 @@ public class StreamThreadTest {
@Test
public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
- builder.stream("t1").groupByKey();
+ internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey();
final TestStreamTask testStreamTask = new TestStreamTask(
new TaskId(0, 0),
applicationId,
Utils.mkSet(new TopicPartition("t1", 0)),
- builder.build(0),
+ internalTopologyBuilder.build(0),
clientSupplier.consumer,
clientSupplier.getProducer(new HashMap<String, Object>()),
clientSupplier.restoreConsumer,
@@ -1373,7 +1383,7 @@ public class StreamThreadTest {
};
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1381,7 +1391,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
@@ -1408,12 +1418,12 @@ public class StreamThreadTest {
@Test
public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
- builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
+ internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count(new MockStateStoreSupplier(stateStore));
final TestStreamTask testStreamTask = new TestStreamTask(
new TaskId(0, 0),
applicationId,
Utils.mkSet(new TopicPartition("t1", 0)),
- builder.build(0),
+ internalTopologyBuilder.build(0),
clientSupplier.consumer,
clientSupplier.getProducer(new HashMap<String, Object>()),
clientSupplier.restoreConsumer,
@@ -1428,7 +1438,7 @@ public class StreamThreadTest {
};
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1436,7 +1446,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
@@ -1474,13 +1484,13 @@ public class StreamThreadTest {
@Test
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception {
- builder.stream("t1").groupByKey();
+ internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey();
final TestStreamTask testStreamTask = new TestStreamTask(
new TaskId(0, 0),
applicationId,
Utils.mkSet(new TopicPartition("t1", 0)),
- builder.build(0),
+ internalTopologyBuilder.build(0),
clientSupplier.consumer,
clientSupplier.getProducer(new HashMap<String, Object>()),
clientSupplier.restoreConsumer,
@@ -1495,7 +1505,7 @@ public class StreamThreadTest {
};
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1503,7 +1513,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
@@ -1534,13 +1544,13 @@ public class StreamThreadTest {
@Test
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
- builder.stream("t1").groupByKey();
+ internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey();
final TestStreamTask testStreamTask = new TestStreamTask(
new TaskId(0, 0),
applicationId,
Utils.mkSet(new TopicPartition("t1", 0)),
- builder.build(0),
+ internalTopologyBuilder.build(0),
clientSupplier.consumer,
clientSupplier.getProducer(new HashMap<String, Object>()),
clientSupplier.restoreConsumer,
@@ -1555,7 +1565,7 @@ public class StreamThreadTest {
};
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1563,7 +1573,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory) {
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory) {
@Override
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
@@ -1594,11 +1604,11 @@ public class StreamThreadTest {
@Test
@SuppressWarnings("unchecked")
public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
- builder.addSource("source", Pattern.compile("t.*"));
- builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+ internalTopologyBuilder.addSource(null, "source", null, null, null, Pattern.compile("t.*"));
+ internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source");
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1606,7 +1616,7 @@ public class StreamThreadTest {
processId,
metrics,
mockTime,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory);
@@ -1619,13 +1629,12 @@ public class StreamThreadTest {
thread.setPartitionAssignor(partitionAssignor);
- final Field
- nodeToSourceTopicsField =
- builder.internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
+ final Field nodeToSourceTopicsField =
+ internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
nodeToSourceTopicsField.setAccessible(true);
final Map<String, List<String>>
nodeToSourceTopics =
- (Map<String, List<String>>) nodeToSourceTopicsField.get(builder.internalTopologyBuilder);
+ (Map<String, List<String>>) nodeToSourceTopicsField.get(internalTopologyBuilder);
final List<TopicPartition> topicPartitions = new ArrayList<>();
final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0);
@@ -1833,7 +1842,7 @@ public class StreamThreadTest {
final String storeName = "store";
final String changelogTopic = applicationId + "-" + storeName + "-changelog";
- builder.stream("topic1").groupByKey().count(storeName);
+ internalStreamsBuilder.stream(null, null, null, null, "topic1").groupByKey().count(storeName);
final MockClientSupplier clientSupplier = new MockClientSupplier();
clientSupplier.restoreConsumer.updatePartitions(changelogTopic,
@@ -1850,7 +1859,7 @@ public class StreamThreadTest {
});
final StreamThread thread = new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1858,7 +1867,7 @@ public class StreamThreadTest {
processId,
new Metrics(),
new MockTime(),
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
@@ -1933,7 +1942,7 @@ public class StreamThreadTest {
private StreamThread getStreamThread() {
return new StreamThread(
- builder.internalTopologyBuilder,
+ internalTopologyBuilder,
config,
clientSupplier,
applicationId,
@@ -1941,7 +1950,7 @@ public class StreamThreadTest {
processId,
metrics,
Time.SYSTEM,
- new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
+ new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
0,
stateDirectory) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 8ee1d6e..d394abe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -23,10 +23,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
@@ -56,7 +57,7 @@ public class StreamsMetadataStateTest {
private TopicPartition topic2P0;
private TopicPartition topic3P0;
private Map<HostInfo, Set<TopicPartition>> hostToPartitions;
- private KStreamBuilder builder;
+ private StreamsBuilder builder;
private TopicPartition topic1P1;
private TopicPartition topic2P1;
private TopicPartition topic4P0;
@@ -66,8 +67,8 @@ public class StreamsMetadataStateTest {
private StreamPartitioner<String, Object> partitioner;
@Before
- public void before() {
- builder = new KStreamBuilder();
+ public void before() throws Exception {
+ builder = new StreamsBuilder();
final KStream<Object, Object> one = builder.stream("topic-one");
one.groupByKey().count("table-one");
@@ -89,7 +90,7 @@ public class StreamsMetadataStateTest {
builder.globalTable("global-topic", "global-table");
- builder.setApplicationId("appId");
+ InternalStreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
topic1P0 = new TopicPartition("topic-one", 0);
topic1P1 = new TopicPartition("topic-one", 1);
@@ -115,7 +116,7 @@ public class StreamsMetadataStateTest {
new PartitionInfo("topic-four", 0, null, null, null));
cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet());
- discovery = new StreamsMetadataState(builder.internalTopologyBuilder, hostOne);
+ discovery = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), hostOne);
discovery.onChange(hostToPartitions, cluster);
partitioner = new StreamPartitioner<String, Object>() {
@Override
@@ -127,7 +128,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception {
- new StreamsMetadataState(builder.internalTopologyBuilder, hostOne).getAllMetadataForStore("store");
+ new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store");
}
@Test
@@ -294,7 +295,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception {
- final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
+ final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST);
streamsMetadataState.onChange(hostToPartitions, cluster);
assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
}
@@ -307,7 +308,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception {
- final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST);
+ final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(InternalStreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST);
streamsMetadataState.onChange(hostToPartitions, cluster);
assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/da220557/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index c01e169..d7078f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.test.TestUtils;
import java.io.File;
@@ -70,10 +70,10 @@ public class BrokerCompatibilityTest {
streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout + 1);
- final KStreamBuilder builder = new KStreamBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
builder.stream(SOURCE_TOPIC).to(SINK_TOPIC);
- final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {