You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/08/10 21:25:32 UTC
[1/2] kafka git commit: KAFKA-3914: Global discovery of state stores
Repository: kafka
Updated Branches:
refs/heads/trunk caa9bd0fc -> 68b5d014f
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 21de73a..9d261bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
@@ -31,10 +32,12 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@@ -85,6 +88,7 @@ public class StreamPartitionAssignorTest {
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
private final TaskId task3 = new TaskId(0, 3);
+ private String userEndPoint = null;
private Properties configProps() {
return new Properties() {
@@ -115,7 +119,7 @@ public class StreamPartitionAssignorTest {
String clientId = "client-id";
UUID processId = UUID.randomUUID();
- StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), new SystemTime()) {
+ StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) {
@Override
public Set<TaskId> prevTasks() {
return prevTasks;
@@ -137,7 +141,7 @@ public class StreamPartitionAssignorTest {
Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
+ SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
assertEquals(info.encode(), subscription.userData());
}
@@ -163,18 +167,18 @@ public class StreamPartitionAssignorTest {
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
- StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -229,18 +233,18 @@ public class StreamPartitionAssignorTest {
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
- StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -297,18 +301,18 @@ public class StreamPartitionAssignorTest {
String client1 = "client1";
- StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -352,18 +356,18 @@ public class StreamPartitionAssignorTest {
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
- StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
subscriptions.put("consumer11",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
subscriptions.put("consumer20",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -454,7 +458,7 @@ public class StreamPartitionAssignorTest {
UUID uuid = UUID.randomUUID();
String client1 = "client1";
- StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), new SystemTime());
+ StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
@@ -464,7 +468,7 @@ public class StreamPartitionAssignorTest {
standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
- AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
+ AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
partitionAssignor.onAssignment(assignment);
@@ -493,7 +497,7 @@ public class StreamPartitionAssignorTest {
MockClientSupplier clientSupplier = new MockClientSupplier();
- StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -503,7 +507,7 @@ public class StreamPartitionAssignorTest {
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
partitionAssignor.assign(metadata, subscriptions);
@@ -535,7 +539,7 @@ public class StreamPartitionAssignorTest {
MockClientSupplier clientSupplier = new MockClientSupplier();
- StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -545,7 +549,7 @@ public class StreamPartitionAssignorTest {
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
subscriptions.put("consumer10",
- new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
@@ -554,6 +558,139 @@ public class StreamPartitionAssignorTest {
assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ"));
}
+ @Test
+ public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+ final Properties properties = configProps();
+ properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
+ final StreamsConfig config = new StreamsConfig(properties);
+ final TopologyBuilder builder = new TopologyBuilder();
+ final String applicationId = "application-id";
+ builder.setApplicationId(applicationId);
+ builder.addSource("source", "input");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+ builder.addSink("sink", "output", "processor");
+
+ final UUID uuid1 = UUID.randomUUID();
+ final String client1 = "client1";
+
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+ final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
+ final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
+ assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+ }
+
+ @Test
+ public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+ final Properties properties = configProps();
+ final String myEndPoint = "localhost:8080";
+ properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+ final StreamsConfig config = new StreamsConfig(properties);
+ final TopologyBuilder builder = new TopologyBuilder();
+ final String applicationId = "application-id";
+ builder.setApplicationId(applicationId);
+ builder.addSource("source", "topic1");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+ builder.addSink("sink", "output", "processor");
+
+ final List<String> topics = Utils.mkList("topic1");
+
+ final UUID uuid1 = UUID.randomUUID();
+ final String client1 = "client1";
+
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+ subscriptions.put("consumer1",
+ new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, myEndPoint).encode()));
+
+ final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+ final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
+ final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
+ final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new HostInfo("localhost", 8080));
+ assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1),
+ new TopicPartition("topic1", 2)), topicPartitions);
+ }
+
+ @Test
+ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+ final Properties properties = configProps();
+ final String myEndPoint = "localhost";
+ properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+ final StreamsConfig config = new StreamsConfig(properties);
+ final UUID uuid1 = UUID.randomUUID();
+ final String client1 = "client1";
+ final TopologyBuilder builder = new TopologyBuilder();
+ final String applicationId = "application-id";
+ builder.setApplicationId(applicationId);
+
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
+ new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+ try {
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+ Assert.fail("expected to an exception due to invalid config");
+ } catch (ConfigException e) {
+ // pass
+ }
+ }
+
+ @Test
+ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+ final Properties properties = configProps();
+ final String myEndPoint = "localhost:j87yhk";
+ properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+ final StreamsConfig config = new StreamsConfig(properties);
+ final UUID uuid1 = UUID.randomUUID();
+ final String client1 = "client1";
+ final TopologyBuilder builder = new TopologyBuilder();
+ final String applicationId = "application-id";
+ builder.setApplicationId(applicationId);
+
+ final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+ final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
+ new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+ try {
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+ Assert.fail("expected to an exception due to invalid config");
+ } catch (ConfigException e) {
+ // pass
+ }
+ }
+
+ @Test
+ public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
+ final Map<HostInfo, Set<TopicPartition>> hostState =
+ Collections.singletonMap(new HostInfo("localhost", 80),
+ Collections.singleton(new TopicPartition("topic", 0)));
+ AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
+ Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+ hostState);
+ partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
+ assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
+ }
+
private class MockInternalTopicManager extends InternalTopicManager {
public Map<String, Integer> readyTopics = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d1aaa07..3a90ce3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -159,7 +159,7 @@ public class StreamThreadTest {
builder.addSource("source3", "topic3");
builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
- StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime()) {
+ StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
ProcessorTopology topology = builder.build("X", id.topicGroupId);
@@ -278,7 +278,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
- StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), mockTime) {
+ StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
@Override
public void maybeClean() {
super.maybeClean();
@@ -397,7 +397,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
- StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), mockTime) {
+ StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
@Override
public void maybeCommit() {
super.maybeCommit();
@@ -469,7 +469,7 @@ public class StreamThreadTest {
StreamsConfig config = new StreamsConfig(configProps());
MockClientSupplier clientSupplier = new MockClientSupplier();
StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
- clientId, processId, new Metrics(), new MockTime());
+ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
assertSame(clientSupplier.producer, thread.producer);
assertSame(clientSupplier.consumer, thread.consumer);
assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
new file mode 100644
index 0000000..d110277
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StreamsMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StreamsMetadataStateTest {
+
+ private StreamsMetadataState discovery;
+ private HostInfo hostOne;
+ private HostInfo hostTwo;
+ private HostInfo hostThree;
+ private TopicPartition topic1P0;
+ private TopicPartition topic2P0;
+ private TopicPartition topic3P0;
+ private Map<HostInfo, Set<TopicPartition>> hostToPartitions;
+ private KStreamBuilder builder;
+ private TopicPartition topic1P1;
+ private TopicPartition topic2P1;
+ private TopicPartition topic4P0;
+ private List<PartitionInfo> partitionInfos;
+ private Cluster cluster;
+
+ @Before
+ public void before() {
+ builder = new KStreamBuilder();
+ final KStream<Object, Object> one = builder.stream("topic-one");
+ one.groupByKey().count("table-one");
+
+ final KStream<Object, Object> two = builder.stream("topic-two");
+ two.groupByKey().count("table-two");
+
+ builder.stream("topic-three")
+ .groupByKey()
+ .count("table-three");
+
+ builder.merge(one, two).groupByKey().count("merged-table");
+
+ builder.stream("topic-four").mapValues(new ValueMapper<Object, Object>() {
+ @Override
+ public Object apply(final Object value) {
+ return value;
+ }
+ });
+
+ builder.setApplicationId("appId");
+
+ topic1P0 = new TopicPartition("topic-one", 0);
+ topic1P1 = new TopicPartition("topic-one", 1);
+ topic2P0 = new TopicPartition("topic-two", 0);
+ topic2P1 = new TopicPartition("topic-two", 1);
+ topic3P0 = new TopicPartition("topic-three", 0);
+ topic4P0 = new TopicPartition("topic-four", 0);
+
+ hostOne = new HostInfo("host-one", 8080);
+ hostTwo = new HostInfo("host-two", 9090);
+ hostThree = new HostInfo("host-three", 7070);
+ hostToPartitions = new HashMap<>();
+ hostToPartitions.put(hostOne, Utils.mkSet(topic1P0, topic2P1, topic4P0));
+ hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1));
+ hostToPartitions.put(hostThree, Collections.singleton(topic3P0));
+
+ partitionInfos = Arrays.asList(
+ new PartitionInfo("topic-one", 0, null, null, null),
+ new PartitionInfo("topic-one", 1, null, null, null),
+ new PartitionInfo("topic-two", 0, null, null, null),
+ new PartitionInfo("topic-two", 1, null, null, null),
+ new PartitionInfo("topic-three", 0, null, null, null),
+ new PartitionInfo("topic-four", 0, null, null, null));
+
+ cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+ discovery = new StreamsMetadataState(builder);
+ discovery.onChange(hostToPartitions, cluster);
+ }
+
+ @Test
+ public void shouldGetAllStreamInstances() throws Exception {
+ final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+ Utils.mkSet(topic1P0, topic2P1, topic4P0));
+ final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+ Utils.mkSet(topic2P0, topic1P1));
+ final StreamsMetadata three = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+ Collections.singleton(topic3P0));
+
+ Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+ assertEquals(3, actual.size());
+ assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
+ assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
+ assertTrue("expected " + actual + " to contain " + three, actual.contains(three));
+ }
+
+ @Test
+ public void shouldGetAllStreamsInstancesWithNoStores() throws Exception {
+ builder.stream("topic-five").filter(new Predicate<Object, Object>() {
+ @Override
+ public boolean test(final Object key, final Object value) {
+ return true;
+ }
+ }).to("some-other-topic");
+
+ final TopicPartition tp5 = new TopicPartition("topic-five", 1);
+ final HostInfo hostFour = new HostInfo("host-four", 8080);
+ hostToPartitions.put(hostFour, Utils.mkSet(tp5));
+
+ discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
+
+ final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.<String>emptySet(),
+ Collections.singleton(tp5));
+ final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+ assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
+ }
+
+ @Test
+ public void shouldGetInstancesForStoreName() throws Exception {
+ final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+ Utils.mkSet(topic1P0, topic2P1, topic4P0));
+ final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+ Utils.mkSet(topic2P0, topic1P1));
+ final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("table-one");
+ assertEquals(2, actual.size());
+ assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
+ assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() throws Exception {
+ discovery.getAllMetadataForStore(null);
+ }
+
+ @Test
+ public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() throws Exception {
+ final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("not-a-store");
+ assertTrue(actual.isEmpty());
+ }
+
+ @Test
+ public void shouldGetInstanceWithKey() throws Exception {
+ final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+ hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
+
+ discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+
+ final StreamsMetadata expected = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+ Collections.singleton(topic3P0));
+
+ final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key",
+ Serdes.String().serializer());
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void shouldGetInstanceWithKeyAndCustomPartitioner() throws Exception {
+ final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+ hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
+
+ discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+
+ final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-three", "merged-table"),
+ Utils.mkSet(topic2P0, tp4));
+
+ StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", new StreamPartitioner<String, Object>() {
+ @Override
+ public Integer partition(final String key, final Object value, final int numPartitions) {
+ return 1;
+ }
+ });
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception {
+ final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
+ hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2));
+ discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
+
+ final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+ Utils.mkSet(topic2P0, topic1P1, topic2P2));
+
+ final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
+ @Override
+ public Integer partition(final String key, final Object value, final int numPartitions) {
+ return 2;
+ }
+ });
+
+ assertEquals(expected, actual);
+
+ }
+
+ @Test
+ public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() throws Exception {
+ final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store",
+ "key",
+ Serdes.String().serializer());
+ assertNull(actual);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowWhenKeyIsNull() throws Exception {
+ discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowWhenSerializerIsNull() throws Exception {
+ discovery.getMetadataWithKey("table-three", "key", (Serializer) null);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfStoreNameIsNull() throws Exception {
+ discovery.getMetadataWithKey(null, "key", Serdes.String().serializer());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expected = NullPointerException.class)
+ public void shouldThrowIfStreamPartitionerIsNull() throws Exception {
+ discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
deleted file mode 100644
index 14a7f9a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class AssginmentInfoTest {
-
- @Test
- public void testEncodeDecode() {
- List<TaskId> activeTasks =
- Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
- Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
- standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
- standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
-
- AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks);
- AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
-
- assertEquals(info, decoded);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
new file mode 100644
index 0000000..ce94a23
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AssignmentInfoTest {
+
+ @Test
+ public void testEncodeDecode() {
+ List<TaskId> activeTasks =
+ Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+ standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+ standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+
+ AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
+ AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
+
+ assertEquals(info, decoded);
+ }
+
+ @Test
+ public void shouldDecodePreviousVersion() throws Exception {
+ List<TaskId> activeTasks =
+ Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+ standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+ standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+ final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
+ final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
+ assertEquals(oldVersion.activeTasks, decoded.activeTasks);
+ assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
+ assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
+ assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+ }
+
+
+ /**
+ * This is a clone of what the V1 encoding did. The encode method has changed for V2
+ * so it is impossible to test compatibility without having this
+ */
+ private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ // Encode version
+ out.writeInt(oldVersion.version);
+ // Encode active tasks
+ out.writeInt(oldVersion.activeTasks.size());
+ for (TaskId id : oldVersion.activeTasks) {
+ id.writeTo(out);
+ }
+ // Encode standby tasks
+ out.writeInt(oldVersion.standbyTasks.size());
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ id.writeTo(out);
+
+ Set<TopicPartition> partitions = entry.getValue();
+ out.writeInt(partitions.size());
+ for (TopicPartition partition : partitions) {
+ out.writeUTF(partition.topic());
+ out.writeInt(partition.partition());
+ }
+ }
+
+ out.flush();
+ out.close();
+
+ return ByteBuffer.wrap(baos.toByteArray());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 3119bee..cf6d8c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -20,12 +20,15 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Test;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class SubscriptionInfoTest {
@@ -38,10 +41,62 @@ public class SubscriptionInfoTest {
Set<TaskId> standbyTasks =
new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
- SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks);
+ SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null);
SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(info, decoded);
}
+ @Test
+ public void shouldEncodeDecodeWithUserEndPoint() throws Exception {
+ SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(),
+ Collections.singleton(new TaskId(0, 0)), Collections.<TaskId>emptySet(), "localhost:80");
+ SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
+ assertEquals(original, decoded);
+ }
+
+ @Test
+ public void shouldBeBackwardCompatible() throws Exception {
+ UUID processId = UUID.randomUUID();
+
+ Set<TaskId> activeTasks =
+ new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
+ Set<TaskId> standbyTasks =
+ new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+
+ final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
+ final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
+ assertEquals(activeTasks, decode.prevTasks);
+ assertEquals(standbyTasks, decode.standbyTasks);
+ assertEquals(processId, decode.processId);
+ assertNull(decode.userEndPoint);
+
+ }
+
+
+ /**
+ * This is a clone of what the V1 encoding did. The encode method has changed for V2
+ * so it is impossible to test compatibility without having this
+ */
+ private ByteBuffer encodePreviousVersion(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+ ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
+ // version
+ buf.putInt(1);
+ // encode client UUID
+ buf.putLong(processId.getMostSignificantBits());
+ buf.putLong(processId.getLeastSignificantBits());
+ // encode ids of previously running tasks
+ buf.putInt(prevTasks.size());
+ for (TaskId id : prevTasks) {
+ id.writeTo(buf);
+ }
+ // encode ids of cached tasks
+ buf.putInt(standbyTasks.size());
+ for (TaskId id : standbyTasks) {
+ id.writeTo(buf);
+ }
+ buf.rewind();
+
+ return buf;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a112a5a..0416e40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -66,7 +67,7 @@ public class StreamThreadStateStoreProviderTest {
public void before() throws IOException {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("the-source", "the-source");
- builder.addProcessor("the-processor", new MockProcessorSupplier());
+ builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
builder.addStateStore(Stores.create("kv-store")
.withStringKeys()
.withStringValues().inMemory().build(), "the-processor");
@@ -106,7 +107,7 @@ public class StreamThreadStateStoreProviderTest {
thread = new StreamThread(builder, streamsConfig, clientSupplier,
applicationId,
"clientId", UUID.randomUUID(), new Metrics(),
- new SystemTime()) {
+ new SystemTime(), new StreamsMetadataState(builder)) {
@Override
public Map<TaskId, StreamTask> tasks() {
return tasks;
[2/2] kafka git commit: KAFKA-3914: Global discovery of state stores
Posted by gu...@apache.org.
KAFKA-3914: Global discovery of state stores
guozhangwang enothereska mjsax miguno please take a look. A few things that need to be clarified
1. I've added StreamsConfig.USER_ENDPOINT_CONFIG, but should we have separate configs for host and port or is this one config ok?
2. `HostState` in the KIP has a byte[] field - not sure why and what it would be populated with
3. I've changed the API to return `List<KafkaStreamsInstance>` as opposed to `Map<HostInfo, Set<TaskMetadata>>` as i find this far more intuitive to work with.
Author: Damian Guy <da...@gmail.com>
Reviewers: Matthias J. Sax, Michael G. Noll, Eno Thereska, Guozhang Wang
Closes #1576 from dguy/kafka-3914v2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68b5d014
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68b5d014
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68b5d014
Branch: refs/heads/trunk
Commit: 68b5d014fa3bf2e18da253ded2bbcd4f5d4a9d8d
Parents: caa9bd0
Author: Damian Guy <da...@gmail.com>
Authored: Wed Aug 10 14:25:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Aug 10 14:25:23 2016 -0700
----------------------------------------------------------------------
.../producer/internals/DefaultPartitioner.java | 22 +-
.../org/apache/kafka/common/utils/Utils.java | 16 ++
.../org/apache/kafka/streams/KafkaStreams.java | 94 ++++++-
.../org/apache/kafka/streams/StreamsConfig.java | 10 +
.../internals/WindowedStreamPartitioner.java | 5 +-
.../streams/processor/TopologyBuilder.java | 55 +++-
.../internals/DefaultStreamPartitioner.java | 43 +++
.../internals/StreamPartitionAssignor.java | 103 +++++++-
.../processor/internals/StreamThread.java | 10 +-
.../internals/StreamsMetadataState.java | 237 +++++++++++++++++
.../internals/assignment/AssignmentInfo.java | 155 ++++++-----
.../internals/assignment/SubscriptionInfo.java | 91 ++++---
.../apache/kafka/streams/state/HostInfo.java | 81 ++++++
.../kafka/streams/state/StreamsMetadata.java | 93 +++++++
.../apache/kafka/streams/KafkaStreamsTest.java | 43 ++-
.../integration/RegexSourceIntegrationTest.java | 3 +-
.../streams/kstream/KStreamBuilderTest.java | 51 ++++
.../streams/processor/TopologyBuilderTest.java | 35 +++
.../internals/StreamPartitionAssignorTest.java | 185 +++++++++++--
.../processor/internals/StreamThreadTest.java | 8 +-
.../internals/StreamsMetadataStateTest.java | 262 +++++++++++++++++++
.../assignment/AssginmentInfoTest.java | 50 ----
.../assignment/AssignmentInfoTest.java | 107 ++++++++
.../assignment/SubscriptionInfoTest.java | 57 +++-
.../StreamThreadStateStoreProviderTest.java | 5 +-
25 files changed, 1599 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index f81c496..241e809 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -37,22 +37,6 @@ public class DefaultPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
- /**
- * A cheap way to deterministically convert a number to a positive value. When the input is
- * positive, the original value is returned. When the input number is negative, the returned
- * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
- * value.
- *
- * Note: changing this method in the future will possibly cause partition selection not to be
- * compatible with the existing messages already placed on a partition.
- *
- * @param number a given number
- * @return a positive number.
- */
- private static int toPositive(int number) {
- return number & 0x7fffffff;
- }
-
public void configure(Map<String, ?> configs) {}
/**
@@ -72,15 +56,15 @@ public class DefaultPartitioner implements Partitioner {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
- int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
+ int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
- return DefaultPartitioner.toPositive(nextValue) % numPartitions;
+ return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
- return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e740618..4629baf 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -699,4 +699,20 @@ public class Utils {
throw exception;
}
+ /**
+ * A cheap way to deterministically convert a number to a positive value. When the input is
+ * positive, the original value is returned. When the input number is negative, the returned
+ * positive value is the original value bit AND against 0x7fffffff which is not its absolutely
+ * value.
+ *
+ * Note: changing this method in the future will possibly cause partition selection not to be
+ * compatible with the existing messages already placed on a partition since it is used
+ * in producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}
+ *
+ * @param number a given number
+ * @return a positive number.
+ */
+ public static int toPositive(int number) {
+ return number & 0x7fffffff;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b9553c9..4fabdf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -22,28 +22,34 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
* sends output to zero or more output topics.
@@ -104,6 +110,7 @@ public class KafkaStreams {
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
private final UUID processId;
+ private StreamsMetadataState streamsMetadataState;
private final StreamsConfig config;
@@ -164,11 +171,19 @@ public class KafkaStreams {
this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
+ streamsMetadataState = new StreamsMetadataState(builder);
for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time);
+ this.threads[i] = new StreamThread(builder,
+ config,
+ clientSupplier,
+ applicationId,
+ clientId,
+ processId,
+ metrics,
+ time,
+ streamsMetadataState);
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
-
this.queryableStoreProvider = new QueryableStoreProvider(storeProviders);
}
@@ -274,6 +289,77 @@ public class KafkaStreams {
thread.setUncaughtExceptionHandler(eh);
}
+
+ /**
+ * Find all of the instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to
+ *
+ * Note: this is a point in time view and it may change due to partition reassignment.
+ * @return collection containing all instances of {@link StreamsMetadata} in the {@link KafkaStreams} application that this instance belongs to
+ */
+ public Collection<StreamsMetadata> allMetadata() {
+ validateIsRunning();
+ return streamsMetadataState.getAllMetadata();
+ }
+
+
+ /**
+ * Find instances of {@link StreamsMetadata} that contains the given storeName
+ *
+ * Note: this is a point in time view and it may change due to partition reassignment.
+ * @param storeName the storeName to find metadata for
+ * @return A collection containing instances of {@link StreamsMetadata} that have the provided storeName
+ */
+ public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
+ validateIsRunning();
+ return streamsMetadataState.getAllMetadataForStore(storeName);
+ }
+
+ /**
+ * Find the {@link StreamsMetadata} instance that contains the given storeName
+ * and the corresponding hosted store instance contains the given key. This will use
+ * the {@link org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner} to
+ * locate the partition. If a custom partitioner has been used please use
+ * {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
+ *
+ * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+ * this method provides a way of finding which host it would exist on.
+ *
+ * Note: this is a point in time view and it may change due to partition reassignment.
+ * @param storeName Name of the store
+ * @param key Key to use to for partition
+ * @param keySerializer Serializer for the key
+ * @param <K> key type
+ * @return The {@link StreamsMetadata} for the storeName and key
+ */
+ public <K> StreamsMetadata metadataForKey(final String storeName,
+ final K key,
+ final Serializer<K> keySerializer) {
+ validateIsRunning();
+ return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
+ }
+
+ /**
+ * Find the {@link StreamsMetadata} instance that contains the given storeName
+ * and the corresponding hosted store instance contains the given key
+ *
+ * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+ * this method provides a way of finding which host it would exist on.
+ *
+ * Note: this is a point in time view and it may change due to partition reassignment.
+ * @param storeName Name of the store
+ * @param key Key to use to for partition
+ * @param partitioner Partitioner for the store
+ * @param <K> key type
+ * @return The {@link StreamsMetadata} for the storeName and key
+ */
+ public <K> StreamsMetadata metadataForKey(final String storeName,
+ final K key,
+ final StreamPartitioner<K, ?> partitioner) {
+ validateIsRunning();
+ return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
+ }
+
+
/**
* Get a facade wrapping the {@link org.apache.kafka.streams.processor.StateStore} instances
* with the provided storeName and accepted by {@link QueryableStoreType#accepts(StateStore)}.
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b624e0e..41498cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -109,6 +109,10 @@ public class StreamsConfig extends AbstractConfig {
public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
+ /**<code>user.endpoint</code> */
+ public static final String APPLICATION_SERVER_CONFIG = "application.server";
+ public static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
+
/** <code>metrics.sample.window.ms</code> */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
@@ -225,6 +229,11 @@ public class StreamsConfig extends AbstractConfig {
atLeast(1),
Importance.LOW,
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+ .define(APPLICATION_SERVER_CONFIG,
+ Type.STRING,
+ "",
+ Importance.LOW,
+ APPLICATION_SERVER_DOC)
.define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
null,
@@ -319,6 +328,7 @@ public class StreamsConfig extends AbstractConfig {
if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals(""))
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG));
+ props.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
return props;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 1e30864..ba9873b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import static org.apache.kafka.common.utils.Utils.toPositive;
+
public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
private final WindowedSerializer<K> serializer;
@@ -45,8 +47,5 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
- private static int toPositive(int number) {
- return number & 0x7fffffff;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 8e3dc7a..7b79236 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -68,6 +68,7 @@ public class TopologyBuilder {
private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+ private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();
private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
private String applicationId;
@@ -616,12 +617,40 @@ public class TopologyBuilder {
NodeFactory nodeFactory = nodeFactories.get(processorName);
if (nodeFactory instanceof ProcessorNodeFactory) {
- ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
+ ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory;
+ processorNodeFactory.addStateStore(stateStoreName);
+ connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
} else {
throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
}
}
+
+ private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+ final Set<String> sourceTopics = new HashSet<>();
+ for (String parent : parents) {
+ NodeFactory nodeFactory = nodeFactories.get(parent);
+ if (nodeFactory instanceof SourceNodeFactory) {
+ sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) nodeFactory).getTopics()));
+ } else if (nodeFactory instanceof ProcessorNodeFactory) {
+ sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents));
+ }
+ }
+ return sourceTopics;
+ }
+
+ private void connectStateStoreNameToSourceTopics(final String stateStoreName,
+ final ProcessorNodeFactory processorNodeFactory) {
+
+ final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
+ if (sourceTopics.isEmpty()) {
+ throw new TopologyBuilderException("can't find source topic for state store " +
+ stateStoreName);
+ }
+ stateStoreNameToSourceTopics.put(stateStoreName,
+ Collections.unmodifiableSet(sourceTopics));
+ }
+
/**
* Returns the map of topic groups keyed by the group id.
* A topic group is a group of topics in the same task.
@@ -875,20 +904,25 @@ public class TopologyBuilder {
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
*/
public synchronized Set<String> sourceTopics() {
+ Set<String> topics = maybeDecorateInternalSourceTopics(sourceTopicNames);
+ return Collections.unmodifiableSet(topics);
+ }
+
+ private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopicNames) {
Set<String> topics = new HashSet<>();
for (String topic : sourceTopicNames) {
if (internalTopicNames.contains(topic)) {
if (applicationId == null) {
throw new TopologyBuilderException("there are internal topics and "
+ "applicationId is null. Call "
- + "setApplicationId before sourceTopics");
+ + "setApplicationId first");
}
topics.add(applicationId + "-" + topic);
} else {
topics.add(topic);
}
}
- return Collections.unmodifiableSet(topics);
+ return topics;
}
public synchronized Pattern sourceTopicPattern() {
@@ -917,7 +951,8 @@ public class TopologyBuilder {
/**
* Set the applicationId. This is required before calling
- * {@link #sourceTopics}, {@link #topicGroups} and {@link #copartitionSources}
+ * {@link #sourceTopics}, {@link #topicGroups}, {@link #copartitionSources}, and
+ * {@link #stateStoreNameToSourceTopics}
* @param applicationId the streams applicationId. Should be the same as set by
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
*/
@@ -925,4 +960,16 @@ public class TopologyBuilder {
Objects.requireNonNull(applicationId, "applicationId can't be null");
this.applicationId = applicationId;
}
+
+ /**
+ * @return a mapping from state store name to a Set of source Topics.
+ */
+ public Map<String, Set<String>> stateStoreNameToSourceTopics() {
+ final Map<String, Set<String>> results = new HashMap<>();
+ for (Map.Entry<String, Set<String>> entry : stateStoreNameToSourceTopics.entrySet()) {
+ results.put(entry.getKey(), maybeDecorateInternalSourceTopics(entry.getValue()));
+
+ }
+ return results;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
new file mode 100644
index 0000000..006f010
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
+
+ private final Serializer<K> keySerializer;
+ private final Cluster cluster;
+ private final String topic;
+ private final DefaultPartitioner defaultPartitioner;
+
+ public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) {
+ this.keySerializer = keySerializer;
+ this.cluster = cluster;
+ this.topic = topic;
+ this.defaultPartitioner = new DefaultPartitioner();
+ }
+
+ @Override
+ public Integer partition(final K key, final V value, final int numPartitions) {
+ final byte[] keyBytes = keySerializer.serialize(topic, key);
+ return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 4b52511..fd70a01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,7 +24,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
@@ -33,7 +35,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +55,10 @@ import java.util.UUID;
public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+ private String userEndPointConfig;
+ private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
+ private Cluster metadataWithInternalTopics;
+
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
@@ -119,6 +125,25 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
streamThread = (StreamThread) o;
streamThread.partitionAssignor(this);
+ String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
+ if (userEndPoint != null && !userEndPoint.isEmpty()) {
+ final String[] hostPort = userEndPoint.split(":");
+ if (hostPort.length != 2) {
+ throw new ConfigException(String.format("Config %s isn't in the correct format. Expected a host:port pair" +
+ " but received %s",
+ StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+ } else {
+ try {
+ Integer.valueOf(hostPort[1]);
+ this.userEndPointConfig = userEndPoint;
+ } catch (NumberFormatException nfe) {
+ throw new ConfigException(String.format("Invalid port %s supplied in %s for config %s",
+ hostPort[1], userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
+ }
+ }
+
+ }
+
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
(String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
@@ -143,7 +168,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
+ SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
return new Subscription(new ArrayList<>(topics), data.encode());
}
@@ -228,6 +253,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Map<UUID, Set<String>> consumersByClient = new HashMap<>();
Map<UUID, ClientState<TaskId>> states = new HashMap<>();
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+ Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
// decode subscription info
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
@@ -239,7 +265,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
-
+ if (info.userEndPoint != null) {
+ final String[] hostPort = info.userEndPoint.split(":");
+ consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
+ }
Set<String> consumers = consumersByClient.get(info.processId);
if (consumers == null) {
consumers = new HashSet<>();
@@ -327,7 +356,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
internalSourceTopicToTaskIds.clear();
- Cluster metadataWithInternalTopics = metadata;
+ metadataWithInternalTopics = metadata;
if (internalTopicManager != null)
metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
@@ -361,8 +390,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// assign tasks to clients
states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
- Map<String, Assignment> assignment = new HashMap<>();
+ final List<AssignmentSupplier> assignmentSuppliers = new ArrayList<>();
+
+ final Map<HostInfo, Set<TopicPartition>> endPointMap = new HashMap<>();
for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
UUID processId = entry.getKey();
Set<String> consumers = entry.getValue();
@@ -408,14 +439,24 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
for (AssignedPartition partition : assignedPartitions) {
active.add(partition.taskId);
activePartitions.add(partition.partition);
+ HostInfo hostInfo = consumerEndPointMap.get(processId);
+ if (hostInfo != null) {
+ if (!endPointMap.containsKey(hostInfo)) {
+ endPointMap.put(hostInfo, new HashSet<TopicPartition>());
+ }
+ final Set<TopicPartition> topicPartitions = endPointMap.get(hostInfo);
+ topicPartitions.add(partition.partition);
+ }
}
- AssignmentInfo data = new AssignmentInfo(active, standby);
- assignment.put(consumer, new Assignment(activePartitions, data.encode()));
- i++;
- active.clear();
- standby.clear();
+ assignmentSuppliers.add(new AssignmentSupplier(consumer,
+ active,
+ standby,
+ endPointMap,
+ activePartitions));
+
+ i++;
}
}
@@ -424,9 +465,39 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// change log topics should be compacted
prepareTopic(stateChangelogTopicToTaskIds, true /* compactTopic */, true);
+ Map<String, Assignment> assignment = new HashMap<>();
+ for (AssignmentSupplier assignmentSupplier : assignmentSuppliers) {
+ assignment.put(assignmentSupplier.consumer, assignmentSupplier.get());
+ }
return assignment;
}
+ class AssignmentSupplier {
+ private final String consumer;
+ private final List<TaskId> active;
+ private final Map<TaskId, Set<TopicPartition>> standby;
+ private final Map<HostInfo, Set<TopicPartition>> endPointMap;
+ private final List<TopicPartition> activePartitions;
+
+ AssignmentSupplier(final String consumer,
+ final List<TaskId> active,
+ final Map<TaskId, Set<TopicPartition>> standby,
+ final Map<HostInfo, Set<TopicPartition>> endPointMap,
+ final List<TopicPartition> activePartitions) {
+ this.consumer = consumer;
+ this.active = active;
+ this.standby = standby;
+ this.endPointMap = endPointMap;
+ this.activePartitions = activePartitions;
+ }
+
+ Assignment get() {
+ return new Assignment(activePartitions, new AssignmentInfo(active,
+ standby,
+ endPointMap).encode());
+ }
+ }
+
/**
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@@ -460,6 +531,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
this.partitionToTaskIds = partitionToTaskIds;
+ this.partitionsByHostState = info.partitionsByHostState;
+ }
+
+ public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
+ if (partitionsByHostState == null) {
+ return Collections.emptyMap();
+ }
+ return Collections.unmodifiableMap(partitionsByHostState);
+ }
+
+ public Cluster clusterMetadata() {
+ return metadataWithInternalTopics;
}
private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c84cae0..f416443 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public class StreamThread extends Thread {
private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
public final PartitionGrouper partitionGrouper;
+ private final StreamsMetadataState streamsMetadataState;
public final String applicationId;
public final String clientId;
public final UUID processId;
@@ -110,6 +112,7 @@ public class StreamThread extends Thread {
addStreamTasks(assignment);
addStandbyTasks();
lastClean = time.milliseconds(); // start the cleaning cycle
+ streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata());
} catch (Throwable t) {
rebalanceException = t;
throw t;
@@ -127,6 +130,7 @@ public class StreamThread extends Thread {
} finally {
// TODO: right now upon partition revocation, we always remove all the tasks;
// this behavior can be optimized to only remove affected tasks in the future
+ streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
removeStreamTasks();
removeStandbyTasks();
}
@@ -140,7 +144,8 @@ public class StreamThread extends Thread {
String clientId,
UUID processId,
Metrics metrics,
- Time time) {
+ Time time,
+ StreamsMetadataState streamsMetadataState) {
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
this.applicationId = applicationId;
@@ -151,6 +156,7 @@ public class StreamThread extends Thread {
this.clientId = clientId;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+ this.streamsMetadataState = streamsMetadataState;
// set the producer and consumer clients
String threadName = getName();
@@ -500,6 +506,8 @@ public class StreamThread extends Thread {
return tasks;
}
+
+
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
new file mode 100644
index 0000000..eeb3bc9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StreamsMetadata;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+
+/**
+ * Provides access to the {@link StreamsMetadata} in a KafkaStreams application. This can be used
+ * to discover the locations of {@link org.apache.kafka.streams.processor.StateStore}s
+ * in a KafkaStreams application
+ */
+public class StreamsMetadataState {
+ private final TopologyBuilder builder;
+ private final List<StreamsMetadata> allMetadata = new ArrayList<>();
+ private Cluster clusterMetadata;
+
+ public StreamsMetadataState(final TopologyBuilder builder) {
+ this.builder = builder;
+ }
+
+ /**
+ * Find all of the {@link StreamsMetadata}s in a
+ * {@link KafkaStreams application}
+ *
+ * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
+ */
+ public synchronized Collection<StreamsMetadata> getAllMetadata() {
+ return allMetadata;
+ }
+
+ /**
+ * Find all of the {@link StreamsMetadata}s for a given storeName
+ *
+ * @param storeName the storeName to find metadata for
+ * @return A collection of {@link StreamsMetadata} that have the provided storeName
+ */
+ public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
+ Objects.requireNonNull(storeName, "storeName cannot be null");
+
+ final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+ if (sourceTopics == null) {
+ return Collections.emptyList();
+ }
+
+ final ArrayList<StreamsMetadata> results = new ArrayList<>();
+ for (StreamsMetadata metadata : allMetadata) {
+ if (metadata.stateStoreNames().contains(storeName)) {
+ results.add(metadata);
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Find the {@link StreamsMetadata}s for a given storeName and key. This method will use the
+ * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
+ * please use {@link StreamsMetadataState#getMetadataWithKey(String, Object, StreamPartitioner)}
+ *
+ * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
+ * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
+ *
+ * @param storeName Name of the store
+ * @param key Key to use
+ * @param keySerializer Serializer for the key
+ * @param <K> key type
+ * @return The {@link StreamsMetadata} for the storeName and key
+ */
+ public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
+ final K key,
+ final Serializer<K> keySerializer) {
+ Objects.requireNonNull(keySerializer, "keySerializer can't be null");
+ Objects.requireNonNull(storeName, "storeName can't be null");
+ Objects.requireNonNull(key, "key can't be null");
+
+ final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
+ if (sourceTopicsInfo == null) {
+ return null;
+ }
+ return getStreamsMetadataForKey(storeName,
+ key,
+ new DefaultStreamPartitioner<>(keySerializer,
+ clusterMetadata,
+ sourceTopicsInfo.topicWithMostPartitions),
+ sourceTopicsInfo);
+ }
+
+
+
+
+
+ /**
+ * Find the {@link StreamsMetadata}s for a given storeName and key.
+ *
+ * Note: the key may not exist in the {@link StateStore},
+ * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
+ *
+ * @param storeName Name of the store
+ * @param key Key to use
+ * @param partitioner partitioner to use to find correct partition for key
+ * @param <K> key type
+ * @return The {@link StreamsMetadata} for the storeName and key
+ */
+ public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
+ final K key,
+ final StreamPartitioner<K, ?> partitioner) {
+ Objects.requireNonNull(storeName, "storeName can't be null");
+ Objects.requireNonNull(key, "key can't be null");
+ Objects.requireNonNull(partitioner, "partitioner can't be null");
+
+ SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
+ if (sourceTopicsInfo == null) {
+ return null;
+ }
+ return getStreamsMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
+ }
+
+ /**
+ * Respond to changes to the HostInfo -> TopicPartition mapping. Will rebuild the
+ * metadata
+ * @param currentState the current mapping of {@link HostInfo} -> {@link TopicPartition}s
+ * @param clusterMetadata the current clusterMetadata {@link Cluster}
+ */
+ public synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata) {
+ this.clusterMetadata = clusterMetadata;
+ rebuildMetadata(currentState);
+ }
+
+ private boolean hasPartitionsForAnyTopics(final Set<String> topicNames, final Set<TopicPartition> partitionForHost) {
+ for (TopicPartition topicPartition : partitionForHost) {
+ if (topicNames.contains(topicPartition.topic())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> currentState) {
+ allMetadata.clear();
+ if (currentState.isEmpty()) {
+ return;
+ }
+ final Map<String, Set<String>> stores = builder.stateStoreNameToSourceTopics();
+ for (Map.Entry<HostInfo, Set<TopicPartition>> entry : currentState.entrySet()) {
+ final HostInfo key = entry.getKey();
+ final Set<TopicPartition> partitionsForHost = new HashSet<>(entry.getValue());
+ final Set<String> storesOnHost = new HashSet<>();
+ for (Map.Entry<String, Set<String>> storeTopicEntry : stores.entrySet()) {
+ final Set<String> topicsForStore = storeTopicEntry.getValue();
+ if (hasPartitionsForAnyTopics(topicsForStore, partitionsForHost)) {
+ storesOnHost.add(storeTopicEntry.getKey());
+ }
+ }
+ allMetadata.add(new StreamsMetadata(key, storesOnHost, partitionsForHost));
+ }
+ }
+
+ private <K> StreamsMetadata getStreamsMetadataForKey(final String storeName,
+ final K key,
+ final StreamPartitioner<K, ?> partitioner,
+ final SourceTopicsInfo sourceTopicsInfo) {
+
+ final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);
+ final Set<TopicPartition> matchingPartitions = new HashSet<>();
+ for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
+ matchingPartitions.add(new TopicPartition(sourceTopic, partition));
+ }
+
+ for (StreamsMetadata streamsMetadata : allMetadata) {
+ final Set<String> stateStoreNames = streamsMetadata.stateStoreNames();
+ final Set<TopicPartition> topicPartitions = new HashSet<>(streamsMetadata.topicPartitions());
+ topicPartitions.retainAll(matchingPartitions);
+ if (stateStoreNames.contains(storeName)
+ && !topicPartitions.isEmpty()) {
+ return streamsMetadata;
+ }
+ }
+ return null;
+ }
+
+ private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
+ final Set<String> sourceTopics = builder.stateStoreNameToSourceTopics().get(storeName);
+ if (sourceTopics == null || sourceTopics.isEmpty()) {
+ return null;
+ }
+ return new SourceTopicsInfo(sourceTopics);
+ }
+
+ private class SourceTopicsInfo {
+ private final Set<String> sourceTopics;
+ private int maxPartitions;
+ private String topicWithMostPartitions;
+
+ private SourceTopicsInfo(final Set<String> sourceTopics) {
+ this.sourceTopics = sourceTopics;
+ for (String topic : sourceTopics) {
+ final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic);
+ if (partitions != null && partitions.size() > maxPartitions) {
+ maxPartitions = partitions.size();
+ topicWithMostPartitions = partitions.get(0).topic();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 0486e57..6569f85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import org.apache.kafka.common.record.ByteBufferInputStream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,19 +40,28 @@ import java.util.Set;
public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
-
+ /**
+ * A new field was added, partitionsByHostState. CURRENT_VERSION
+ * is required so we can decode the previous version. For example, this may occur
+ * during a rolling upgrade
+ */
+ private static final int CURRENT_VERSION = 2;
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
public final Map<TaskId, Set<TopicPartition>> standbyTasks;
+ public final Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
- public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
- this(1, activeTasks, standbyTasks);
+ public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+ Map<HostInfo, Set<TopicPartition>> hostState) {
+ this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
+ protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+ Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
+ this.partitionsByHostState = hostState;
}
/**
@@ -63,43 +73,48 @@ public class AssignmentInfo {
DataOutputStream out = new DataOutputStream(baos);
try {
- if (version == 1) {
- // Encode version
- out.writeInt(1);
- // Encode active tasks
- out.writeInt(activeTasks.size());
- for (TaskId id : activeTasks) {
- id.writeTo(out);
- }
- // Encode standby tasks
- out.writeInt(standbyTasks.size());
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
- TaskId id = entry.getKey();
- id.writeTo(out);
-
- Set<TopicPartition> partitions = entry.getValue();
- out.writeInt(partitions.size());
- for (TopicPartition partition : partitions) {
- out.writeUTF(partition.topic());
- out.writeInt(partition.partition());
- }
- }
-
- out.flush();
- out.close();
+ // Encode version
+ out.writeInt(version);
+ // Encode active tasks
+ out.writeInt(activeTasks.size());
+ for (TaskId id : activeTasks) {
+ id.writeTo(out);
+ }
+ // Encode standby tasks
+ out.writeInt(standbyTasks.size());
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+ TaskId id = entry.getKey();
+ id.writeTo(out);
+
+ Set<TopicPartition> partitions = entry.getValue();
+ writeTopicPartitions(out, partitions);
+ }
+ out.writeInt(partitionsByHostState.size());
+ for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHostState
+ .entrySet()) {
+ final HostInfo hostInfo = entry.getKey();
+ out.writeUTF(hostInfo.host());
+ out.writeInt(hostInfo.port());
+ writeTopicPartitions(out, entry.getValue());
+ }
- return ByteBuffer.wrap(baos.toByteArray());
+ out.flush();
+ out.close();
- } else {
- TaskAssignmentException ex = new TaskAssignmentException("Unable to encode assignment data: version=" + version);
- log.error(ex.getMessage(), ex);
- throw ex;
- }
+ return ByteBuffer.wrap(baos.toByteArray());
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
}
}
+ private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
+ out.writeInt(partitions.size());
+ for (TopicPartition partition : partitions) {
+ out.writeUTF(partition.topic());
+ out.writeInt(partition.partition());
+ }
+ }
+
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
@@ -111,42 +126,55 @@ public class AssignmentInfo {
try {
// Decode version
int version = in.readInt();
- if (version == 1) {
- // Decode active tasks
- int count = in.readInt();
- List<TaskId> activeTasks = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- activeTasks.add(TaskId.readFrom(in));
- }
- // Decode standby tasks
- count = in.readInt();
- Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
- for (int i = 0; i < count; i++) {
- TaskId id = TaskId.readFrom(in);
-
- int numPartitions = in.readInt();
- Set<TopicPartition> partitions = new HashSet<>(numPartitions);
- for (int j = 0; j < numPartitions; j++) {
- partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
- }
- standbyTasks.put(id, partitions);
- }
-
- return new AssignmentInfo(activeTasks, standbyTasks);
-
- } else {
+ if (version < 0 || version > CURRENT_VERSION) {
TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
log.error(ex.getMessage(), ex);
throw ex;
}
+
+ // Decode active tasks
+ int count = in.readInt();
+ List<TaskId> activeTasks = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ activeTasks.add(TaskId.readFrom(in));
+ }
+ // Decode standby tasks
+ count = in.readInt();
+ Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
+ for (int i = 0; i < count; i++) {
+ TaskId id = TaskId.readFrom(in);
+ standbyTasks.put(id, readTopicPartitions(in));
+ }
+
+ Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
+ if (version == CURRENT_VERSION) {
+ int numEntries = in.readInt();
+ for (int i = 0; i < numEntries; i++) {
+ HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
+ hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
+ }
+ }
+
+ return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
+
+
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
}
+ private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
+ int numPartitions = in.readInt();
+ Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+ for (int j = 0; j < numPartitions; j++) {
+ partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
+ }
+ return partitions;
+ }
+
@Override
public int hashCode() {
- return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode();
+ return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHostState.hashCode();
}
@Override
@@ -155,7 +183,8 @@ public class AssignmentInfo {
AssignmentInfo other = (AssignmentInfo) o;
return this.version == other.version &&
this.activeTasks.equals(other.activeTasks) &&
- this.standbyTasks.equals(other.standbyTasks);
+ this.standbyTasks.equals(other.standbyTasks) &&
+ this.partitionsByHostState.equals(other.partitionsByHostState);
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 874fea8..c3481c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -31,54 +32,59 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
- private static final int CURRENT_VERSION = 1;
+ private static final int CURRENT_VERSION = 2;
public final int version;
public final UUID processId;
public final Set<TaskId> prevTasks;
public final Set<TaskId> standbyTasks;
+ public final String userEndPoint;
- public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
- this(CURRENT_VERSION, processId, prevTasks, standbyTasks);
+ public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+ this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
}
- private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+ private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this.version = version;
this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
+ this.userEndPoint = userEndPoint;
}
/**
* @throws TaskAssignmentException if method fails to encode the data
*/
public ByteBuffer encode() {
- if (version == CURRENT_VERSION) {
- ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
- // version
- buf.putInt(version);
- // encode client UUID
- buf.putLong(processId.getMostSignificantBits());
- buf.putLong(processId.getLeastSignificantBits());
- // encode ids of previously running tasks
- buf.putInt(prevTasks.size());
- for (TaskId id : prevTasks) {
- id.writeTo(buf);
- }
- // encode ids of cached tasks
- buf.putInt(standbyTasks.size());
- for (TaskId id : standbyTasks) {
- id.writeTo(buf);
- }
- buf.rewind();
-
- return buf;
-
+ byte[] endPointBytes;
+ if (userEndPoint == null) {
+ endPointBytes = new byte[0];
} else {
- TaskAssignmentException ex = new TaskAssignmentException("unable to encode subscription data: version=" + version);
- log.error(ex.getMessage(), ex);
- throw ex;
+ endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
}
+ ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 +
+ prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
+ + 4 /* length of bytes */ + endPointBytes.length
+ );
+ // version
+ buf.putInt(version);
+ // encode client UUID
+ buf.putLong(processId.getMostSignificantBits());
+ buf.putLong(processId.getLeastSignificantBits());
+ // encode ids of previously running tasks
+ buf.putInt(prevTasks.size());
+ for (TaskId id : prevTasks) {
+ id.writeTo(buf);
+ }
+ // encode ids of cached tasks
+ buf.putInt(standbyTasks.size());
+ for (TaskId id : standbyTasks) {
+ id.writeTo(buf);
+ }
+ buf.putInt(endPointBytes.length);
+ buf.put(endPointBytes);
+ buf.rewind();
+ return buf;
}
/**
@@ -90,7 +96,7 @@ public class SubscriptionInfo {
// Decode version
int version = data.getInt();
- if (version == CURRENT_VERSION) {
+ if (version == CURRENT_VERSION || version == 1) {
// Decode client UUID
UUID processId = new UUID(data.getLong(), data.getLong());
// Decode previously active tasks
@@ -107,7 +113,17 @@ public class SubscriptionInfo {
standbyTasks.add(TaskId.readFrom(data));
}
- return new SubscriptionInfo(version, processId, prevTasks, standbyTasks);
+ String userEndPoint = null;
+ if (version == CURRENT_VERSION) {
+ int bytesLength = data.getInt();
+ if (bytesLength != 0) {
+ byte[] bytes = new byte[bytesLength];
+ data.get(bytes);
+ userEndPoint = new String(bytes, Charset.forName("UTF-8"));
+ }
+
+ }
+ return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint);
} else {
TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
@@ -118,7 +134,11 @@ public class SubscriptionInfo {
@Override
public int hashCode() {
- return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+ int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+ if (userEndPoint == null) {
+ return hashCode;
+ }
+ return hashCode ^ userEndPoint.hashCode();
}
@Override
@@ -128,7 +148,8 @@ public class SubscriptionInfo {
return this.version == other.version &&
this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
- this.standbyTasks.equals(other.standbyTasks);
+ this.standbyTasks.equals(other.standbyTasks) &&
+ this.userEndPoint != null ? this.userEndPoint.equals(other.userEndPoint) : other.userEndPoint == null;
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
new file mode 100644
index 0000000..37a15e1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+/**
+ * Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams} application.
+ * Instances of this class can be obtained by calling one of:
+ * {@link KafkaStreams#allMetadata()}
+ * {@link KafkaStreams#allMetadataForStore(String)}
+ * {@link KafkaStreams#metadataForKey(String, Object, StreamPartitioner)}
+ * {@link KafkaStreams#metadataForKey(String, Object, Serializer)}
+ *
+ * The HostInfo is constructed during Partition Assignment
+ * see {@link org.apache.kafka.streams.processor.internals.StreamPartitionAssignor}
+ * It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG}
+ *
+ * If developers wish to expose an endpoint in their KafkaStreams applications they should provide the above
+ * config.
+ */
+public class HostInfo {
+ private final String host;
+ private final int port;
+
+ public HostInfo(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HostInfo hostInfo = (HostInfo) o;
+
+ if (port != hostInfo.port) return false;
+ return host.equals(hostInfo.host);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = host.hashCode();
+ result = 31 * result + port;
+ return result;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return "HostInfo{" +
+ "host='" + host + '\'' +
+ ", port=" + port +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
new file mode 100644
index 0000000..541221f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KafkaStreams;
+
+import java.util.Set;
+
+/**
+ * Represents the state of an instance (process) in a {@link KafkaStreams} application.
+ * It contains the user supplied {@link HostInfo} that can be used by developers to build
+ * APIs and services to connect to other instances, the Set of state stores available on
+ * the instance and the Set of {@link TopicPartition}s available on the instance.
+ * NOTE: This is a point in time view. It may change when rebalances happen.
+ */
+public class StreamsMetadata {
+ private final HostInfo hostInfo;
+ private final Set<String> stateStoreNames;
+ private final Set<TopicPartition> topicPartitions;
+
+ public StreamsMetadata(final HostInfo hostInfo,
+ final Set<String> stateStoreNames,
+ final Set<TopicPartition> topicPartitions) {
+
+ this.hostInfo = hostInfo;
+ this.stateStoreNames = stateStoreNames;
+ this.topicPartitions = topicPartitions;
+ }
+
+ public HostInfo hostInfo() {
+ return hostInfo;
+ }
+
+ public Set<String> stateStoreNames() {
+ return stateStoreNames;
+ }
+
+ public Set<TopicPartition> topicPartitions() {
+ return topicPartitions;
+ }
+
+ public String host() {
+ return hostInfo.host();
+ }
+ public int port() {
+ return hostInfo.port();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final StreamsMetadata that = (StreamsMetadata) o;
+
+ if (!hostInfo.equals(that.hostInfo)) return false;
+ if (!stateStoreNames.equals(that.stateStoreNames)) return false;
+ return topicPartitions.equals(that.topicPartitions);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = hostInfo.hashCode();
+ result = 31 * result + stateStoreNames.hashCode();
+ result = 31 * result + topicPartitions.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamsMetadata{" +
+ "hostInfo=" + hostInfo +
+ ", stateStoreNames=" + stateStoreNames +
+ ", topicPartitions=" + topicPartitions +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 29b8b03..b15c2ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -17,8 +17,10 @@
package org.apache.kafka.streams;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
@@ -34,7 +36,7 @@ import static org.junit.Assert.assertTrue;
public class KafkaStreamsTest {
// We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
- // quick enough
+ // quick enough)
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
@@ -118,6 +120,45 @@ public class KafkaStreamsTest {
}
}
+ @Test(expected = IllegalStateException.class)
+ public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+ KafkaStreams streams = createKafkaStreams();
+ streams.allMetadata();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+ KafkaStreams streams = createKafkaStreams();
+ streams.allMetadataForStore("store");
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+ KafkaStreams streams = createKafkaStreams();
+ streams.metadataForKey("store", "key", Serdes.String().serializer());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+ KafkaStreams streams = createKafkaStreams();
+ streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
+ @Override
+ public Integer partition(final String key, final Object value, final int numPartitions) {
+ return 0;
+ }
+ });
+ }
+
+
+ private KafkaStreams createKafkaStreams() {
+ Properties props = new Properties();
+ props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+ props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+ KStreamBuilder builder = new KStreamBuilder();
+ return new KafkaStreams(builder, props);
+ }
+
@Test
public void testCleanup() throws Exception {
final Properties props = new Properties();
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5a30af5..0892893 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
@@ -338,7 +339,7 @@ public class RegexSourceIntegrationTest {
private int index = 0;
public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
- super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time);
+ super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index c776b8a..b951743 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -26,6 +26,9 @@ import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.After;
import org.junit.Test;
+import java.util.Map;
+import java.util.Set;
+
import static org.junit.Assert.assertEquals;
public class KStreamBuilderTest {
@@ -89,6 +92,54 @@ public class KStreamBuilderTest {
assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed);
}
+ @Test
+ public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
+ final String topic3 = "topic-3";
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<String, String> source1 = builder.stream(topic1);
+ final KStream<String, String> source2 = builder.stream(topic2);
+ final KStream<String, String> source3 = builder.stream(topic3);
+
+ final KStream<String, String> merged = builder.merge(source1, source2, source3);
+ merged.groupByKey().count("my-table");
+ final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
+ assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table"));
+ }
+
+ @Test
+ public void shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors() throws Exception {
+ final String topic1 = "topic-1";
+ final String topic2 = "topic-2";
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<String, String> source1 = builder.stream(topic1);
+ final KStream<String, String> source2 = builder.stream(topic2);
+ final KStream<String, String> processedSource1 =
+ source1.mapValues(new ValueMapper<String, String>() {
+ @Override
+ public String apply(final String value) {
+ return value;
+ }
+ }).filter(new Predicate<String, String>() {
+ @Override
+ public boolean test(final String key, final String value) {
+ return true;
+ }
+ });
+ final KStream<String, String> processedSource2 = source2.filter(new Predicate<String, String>() {
+ @Override
+ public boolean test(final String key, final String value) {
+ return true;
+ }
+ });
+
+ final KStream<String, String> merged = builder.merge(processedSource1, processedSource2);
+ merged.groupByKey().count("my-table");
+ final Map<String, Set<String>> actual = builder.stateStoreNameToSourceTopics();
+ assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table"));
+ }
+
@Test(expected = TopologyBuilderException.class)
public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
new KStreamBuilder().stream();
http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f6ca6db..6f047b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -404,4 +404,39 @@ public class TopologyBuilderTest {
return nodeNames;
}
+ @Test
+ public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source", "topic");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+ builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+ final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+ assertEquals(1, stateStoreNameToSourceTopic.size());
+ assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
+ }
+
+ @Test
+ public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source", "topic");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+ builder.addStateStore(new MockStateStoreSupplier("store", false), false, "processor");
+ final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+ assertEquals(1, stateStoreNameToSourceTopic.size());
+ assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store"));
+ }
+
+ @Test
+ public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.setApplicationId("appId");
+ builder.addInternalTopic("internal-topic");
+ builder.addSource("source", "internal-topic");
+ builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+ builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor");
+ final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics();
+ assertEquals(1, stateStoreNameToSourceTopic.size());
+ assertEquals(Collections.singleton("appId-internal-topic"), stateStoreNameToSourceTopic.get("store"));
+ }
+
}