You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/23 15:28:36 UTC
[1/5] kafka git commit: HOTFIX: Fix putAll and putIfAbsent logic for
correct eviction behavior
Repository: kafka
Updated Branches:
refs/heads/0.10.1 3c0713923 -> f91d95ac9
HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy, Guozhang Wang
Closes #2038 from enothereska/hotfix-put-cache
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4fdbc72f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4fdbc72f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4fdbc72f
Branch: refs/heads/0.10.1
Commit: 4fdbc72fe099cbab200cc690f6075c69cbbefb4a
Parents: 3c07139
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Oct 19 14:01:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:15:11 2016 -0800
----------------------------------------------------------------------
.../streams/state/internals/ThreadCache.java | 14 ++++++--
.../state/internals/ThreadCacheTest.java | 37 ++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fdbc72f/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index d76e5c8..f7355d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -123,12 +123,20 @@ public class ThreadCache {
public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) {
final NamedCache cache = getOrCreateCache(namespace);
- return cache.putIfAbsent(Bytes.wrap(key), value);
+
+ final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
+ maybeEvict(namespace);
+
+ if (result == null) {
+ numPuts++;
+ }
+ return result;
}
public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>> entries) {
- final NamedCache cache = getOrCreateCache(namespace);
- cache.putAll(entries);
+ for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+ put(namespace, entry.key, entry.value);
+ }
}
public LRUCacheEntry delete(final String namespace, final byte[] key) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fdbc72f/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 2ff3b89..b07da6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -389,6 +389,24 @@ public class ThreadCacheTest {
}
@Test
+ public void shouldEvictAfterPutAll() throws Exception {
+ final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+ final String namespace = "namespace";
+ final ThreadCache cache = new ThreadCache(1);
+ cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+ @Override
+ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+ received.addAll(dirty);
+ }
+ });
+
+ cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})),
+ KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+
+ assertEquals(cache.evicts(), 2);
+ }
+
+ @Test
public void shouldPutAll() throws Exception {
final ThreadCache cache = new ThreadCache(100000);
@@ -422,6 +440,25 @@ public class ThreadCacheTest {
assertArrayEquals(value, cache.get("n", key).value);
}
+ @Test
+ public void shouldEvictAfterPutIfAbsent() throws Exception {
+ final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+ final String namespace = "namespace";
+ final ThreadCache cache = new ThreadCache(1);
+ cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+ @Override
+ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+ received.addAll(dirty);
+ }
+ });
+
+ cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
+ cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+ cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+
+ assertEquals(cache.evicts(), 3);
+ }
+
private LRUCacheEntry dirtyEntry(final byte[] key) {
return new LRUCacheEntry(key, true, -1, -1, -1, "");
}
[3/5] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow
because it joins the same group for each topic
Posted by gu...@apache.org.
KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic
- reworked to use a sinlge KafkaConsumer and subscribe only once
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2049 from mjsax/improveResetTool
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dedd8d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dedd8d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dedd8d9
Branch: refs/heads/0.10.1
Commit: 2dedd8d95aa56214f82822c54ff50e0ef9c4f2c8
Parents: 0a24d3a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 24 13:44:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:26:30 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/tools/StreamsResetter.java | 155 +++++++++----------
1 file changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2dedd8d9/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7153790..1bb63f7 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.io.IOException;
-import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
@@ -78,8 +78,8 @@ public class StreamsResetter {
}
public int run(final String[] args, final Properties config) {
- this.consumerConfig.clear();
- this.consumerConfig.putAll(config);
+ consumerConfig.clear();
+ consumerConfig.putAll(config);
int exitCode = EXIT_CODE_SUCCESS;
@@ -95,16 +95,15 @@ public class StreamsResetter {
"Make sure to stop all running application instances before running the reset tool.");
}
- zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+ zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
30000,
30000,
JaasUtils.isZkSecurityEnabled());
- this.allTopics.clear();
- this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+ allTopics.clear();
+ allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
- resetInputAndInternalTopicOffsets();
- seekToEndIntermediateTopics();
+ resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
deleteInternalTopics(zkUtils);
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
@@ -150,111 +149,105 @@ public class StreamsResetter {
.describedAs("list");
try {
- this.options = optionParser.parse(args);
+ options = optionParser.parse(args);
} catch (final OptionException e) {
optionParser.printHelpOn(System.err);
throw e;
}
}
- private void resetInputAndInternalTopicOffsets() {
- final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+ private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+ final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+ final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
- if (inputTopics.size() == 0) {
- System.out.println("No input topics specified.");
+ if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
+ System.out.println("No input or intermediate topics specified. Skipping seek.");
+ return;
} else {
- System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+ if (inputTopics.size() != 0) {
+ System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+ }
+ if (intermediateTopics.size() != 0) {
+ System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
+ }
}
final Properties config = new Properties();
- config.putAll(this.consumerConfig);
- config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
- config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+ config.putAll(consumerConfig);
+ config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+ config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- for (final String inTopic : inputTopics) {
- if (!this.allTopics.contains(inTopic)) {
- System.out.println("Input topic " + inTopic + " not found. Skipping.");
+ final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size());
+ for (final String topic : inputTopics) {
+ if (!allTopics.contains(topic)) {
+ System.err.println("Input topic " + topic + " not found. Skipping.");
+ } else {
+ topicsToSubscribe.add(topic);
+ }
+ }
+ for (final String topic : intermediateTopics) {
+ if (!allTopics.contains(topic)) {
+ System.err.println("Intermediate topic " + topic + " not found. Skipping.");
+ } else {
+ topicsToSubscribe.add(topic);
+ }
+ }
+ for (final String topic : allTopics) {
+ if (isInternalTopic(topic)) {
+ topicsToSubscribe.add(topic);
}
}
- for (final String topic : this.allTopics) {
- if (isInputTopic(topic) || isInternalTopic(topic)) {
- System.out.println("Topic: " + topic);
+ try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+ client.subscribe(topicsToSubscribe);
+ client.poll(1);
+
+ final Set<TopicPartition> partitions = client.assignment();
+ final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+ final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
+
+ for (final TopicPartition p : partitions) {
+ final String topic = p.topic();
+ if (isInputTopic(topic) || isInternalTopic(topic)) {
+ inputAndInternalTopicPartitions.add(p);
+ } else if (isIntermediateTopic(topic)) {
+ intermediateTopicPartitions.add(p);
+ } else {
+ System.err.println("Skipping invalid partition: " + p);
+ }
+ }
- try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
- client.subscribe(Collections.singleton(topic));
- client.poll(1);
+ client.seekToBeginning(inputAndInternalTopicPartitions);
+ client.seekToEnd(intermediateTopicPartitions);
- final Set<TopicPartition> partitions = client.assignment();
- client.seekToBeginning(partitions);
- for (final TopicPartition p : partitions) {
- client.position(p);
- }
- client.commitSync();
- } catch (final RuntimeException e) {
- System.err.println("ERROR: Resetting offsets for topic " + topic + " failed.");
- throw e;
- }
+ for (final TopicPartition p : partitions) {
+ client.position(p);
}
+ client.commitSync();
+ } catch (final RuntimeException e) {
+ System.err.println("ERROR: Resetting offsets failed.");
+ throw e;
}
System.out.println("Done.");
}
private boolean isInputTopic(final String topic) {
- return this.options.valuesOf(inputTopicsOption).contains(topic);
+ return options.valuesOf(inputTopicsOption).contains(topic);
}
- private void seekToEndIntermediateTopics() {
- final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
-
- if (intermediateTopics.size() == 0) {
- System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
- return;
- }
-
- System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
-
- final Properties config = new Properties();
- config.putAll(this.consumerConfig);
- config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
- config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
- config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- for (final String topic : intermediateTopics) {
- if (this.allTopics.contains(topic)) {
- System.out.println("Topic: " + topic);
-
- try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
- client.subscribe(Collections.singleton(topic));
- client.poll(1);
-
- final Set<TopicPartition> partitions = client.assignment();
- client.seekToEnd(partitions);
- for (final TopicPartition p : partitions) {
- client.position(p);
- }
- client.commitSync();
- } catch (final RuntimeException e) {
- System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
- throw e;
- }
- } else {
- System.out.println("Topic " + topic + " not found. Skipping.");
- }
- }
-
- System.out.println("Done.");
+ private boolean isIntermediateTopic(final String topic) {
+ return options.valuesOf(intermediateTopicsOption).contains(topic);
}
private void deleteInternalTopics(final ZkUtils zkUtils) {
- System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption));
+ System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
- for (final String topic : this.allTopics) {
+ for (final String topic : allTopics) {
if (isInternalTopic(topic)) {
final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
- "--zookeeper", this.options.valueOf(zookeeperOption),
+ "--zookeeper", options.valueOf(zookeeperOption),
"--delete", "--topic", topic});
try {
TopicCommand.deleteTopic(zkUtils, commandOptions);
@@ -269,7 +262,7 @@ public class StreamsResetter {
}
private boolean isInternalTopic(final String topicName) {
- return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+ return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
}
[2/5] kafka git commit: HOTFIX: follow up on KAFKA-4275
Posted by gu...@apache.org.
HOTFIX: follow up on KAFKA-4275
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2039 from mjsax/hotfix-ktableLeftJoin
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a24d3a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a24d3a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a24d3a2
Branch: refs/heads/0.10.1
Commit: 0a24d3a258a544314d8ea995157a351f9ca8994e
Parents: 4fdbc72
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 19 20:58:18 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:18:15 2016 -0800
----------------------------------------------------------------------
.../streams/kstream/internals/KTableImpl.java | 14 +-
.../KTableKTableJoinIntegrationTest.java | 280 +++++++++++++++++++
2 files changed, 287 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0a24d3a2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7ce0bbb..06feb0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -297,8 +297,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
- topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+ topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
@@ -325,8 +325,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
- topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+ topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
@@ -352,8 +352,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
- topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
- topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+ topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+ topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0a24d3a2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
new file mode 100644
index 0000000..85e2cf7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableKTableJoinIntegrationTest {
+ private final static int NUM_BROKERS = 1;
+
+ @ClassRule
+ public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ private final static MockTime MOCK_TIME = CLUSTER.time;
+ private final static String TABLE_1 = "table1";
+ private final static String TABLE_2 = "table2";
+ private final static String TABLE_3 = "table3";
+ private final static String OUTPUT = "output-";
+ private static Properties streamsConfig;
+ private KafkaStreams streams;
+ private final static Properties CONSUMER_CONFIG = new Properties();
+
+ @Parameterized.Parameter(value = 0)
+ public JoinType joinType1;
+ @Parameterized.Parameter(value = 1)
+ public JoinType joinType2;
+ @Parameterized.Parameter(value = 2)
+ public List<KeyValue<String, String>> expectedResult;
+
+ //Single parameter, use Object[]
+ @Parameterized.Parameters
+ public static Object[] parameters() {
+ return new Object[][]{
+ {JoinType.INNER, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null))
+ },
+ {JoinType.INNER, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null)
+ )},
+ {JoinType.INNER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C3")
+ )},
+ {JoinType.LEFT, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null)
+ )},
+ {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", null)
+ )},
+ {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C3")
+ )},
+ {JoinType.OUTER, JoinType.INNER, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")
+ )},
+ {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
+ new KeyValue<>("a", null),
+ new KeyValue<>("b", null),
+ new KeyValue<>("c", null),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")
+ )},
+ {JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+ new KeyValue<>("a", "null-A3"),
+ new KeyValue<>("b", "null-B3"),
+ new KeyValue<>("c", "null-C3"),
+ new KeyValue<>("a", "A1-null-A3"),
+ new KeyValue<>("b", "B1-null-B3"),
+ new KeyValue<>("b", "B1-B2-B3"),
+ new KeyValue<>("c", "null-C2-C3")
+ )}
+ };
+ }
+
+ public static Object[] data() {
+ return new Object[]{0, 10 * 1024 * 1024L};
+ }
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ CLUSTER.createTopic(TABLE_1);
+ CLUSTER.createTopic(TABLE_2);
+ CLUSTER.createTopic(TABLE_3);
+ CLUSTER.createTopic(OUTPUT);
+
+ streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+ streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+ streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+ final Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ final List<KeyValue<String, String>> table1 = Arrays.asList(
+ new KeyValue<>("a", "A1"),
+ new KeyValue<>("b", "B1")
+ );
+
+ final List<KeyValue<String, String>> table2 = Arrays.asList(
+ new KeyValue<>("b", "B2"),
+ new KeyValue<>("c", "C2")
+ );
+
+ final List<KeyValue<String, String>> table3 = Arrays.asList(
+ new KeyValue<>("a", "A3"),
+ new KeyValue<>("b", "B3"),
+ new KeyValue<>("c", "C3")
+ );
+
+ // put table 3 first, to make sure data is there when joining T1 with T2
+ IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig, MOCK_TIME);
+ IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME);
+ IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME);
+
+
+ CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
+ CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ }
+
+ @Before
+ public void before() throws Exception {
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+ }
+
+ @After
+ public void after() throws Exception {
+ if (streams != null) {
+ streams.close();
+ streams = null;
+ }
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+ }
+
+ private enum JoinType {
+ INNER, LEFT, OUTER
+ }
+
+ private KafkaStreams prepareTopology() {
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
+ final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
+ final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+
+ join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
+
+ return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
+ }
+
+ private KTable<String, String> join(KTable<String, String> first, KTable<String, String> second, JoinType joinType) {
+ final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+ @Override
+ public String apply(final String value1, final String value2) {
+ return value1 + "-" + value2;
+ }
+ };
+
+ switch (joinType) {
+ case INNER:
+ return first.join(second, joiner);
+ case LEFT:
+ return first.leftJoin(second, joiner);
+ case OUTER:
+ return first.outerJoin(second, joiner);
+ }
+
+ throw new RuntimeException("Unknown join type.");
+ }
+
+ @Test
+ public void KTableKTableJoin() throws Exception {
+ System.out.println("join: " + joinType1 + "-" + joinType2);
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
+
+ streams = prepareTopology();
+ streams.start();
+
+
+ final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ CONSUMER_CONFIG,
+ OUTPUT,
+ expectedResult.size());
+
+ assertThat(result, equalTo(expectedResult));
+ }
+
+}
[5/5] kafka git commit: KAFKA-4361: Streams does not respect user
configs for "default" params
Posted by gu...@apache.org.
KAFKA-4361: Streams does not respect user configs for "default" params
Enable user provided consumer and producer configs to override the streams default configs.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2084 from dguy/kafka-4361
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f91d95ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f91d95ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f91d95ac
Branch: refs/heads/0.10.1
Commit: f91d95ac9aa6c5fe9b7ee091b7759fee53e465d4
Parents: 34f987f
Author: Damian Guy <da...@gmail.com>
Authored: Tue Nov 1 10:07:58 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:28:24 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 21 ++++++----
.../apache/kafka/streams/StreamsConfigTest.java | 43 ++++++++++++++++++++
2 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f91d95ac/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 66c15b9..5ba4383 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -321,16 +321,19 @@ public class StreamsConfig extends AbstractConfig {
* @throws ConfigException
*/
public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
- final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+
+ final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+ final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
- if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
- consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+ consumerProps.putAll(clientProvidedProps);
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -362,16 +365,18 @@ public class StreamsConfig extends AbstractConfig {
* @throws ConfigException
*/
public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException {
- Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+ Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+ final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
// disable auto commit and throw exception if there is user overridden values,
// this is necessary for streams commit semantics
- if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ ", as the streams client will always turn off auto committing.");
}
- consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+ consumerProps.putAll(clientProvidedProps);
// bootstrap.servers should be from StreamsConfig
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -396,8 +401,8 @@ public class StreamsConfig extends AbstractConfig {
*/
public Map<String, Object> getProducerConfigs(String clientId) {
// generate producer configs from original properties and overridden maps
- final Map<String, Object> props = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- props.putAll(PRODUCER_DEFAULT_OVERRIDES);
+ final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+ props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()));
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
http://git-wip-us.apache.org/repos/asf/kafka/blob/f91d95ac/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3caa767..f03bed9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -214,6 +215,48 @@ public class StreamsConfigTest {
streamsConfig.valueSerde();
}
+ @Test
+ public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test
+ public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+ props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
+ assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
+ }
+
+ @Test
+ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+ assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getConsumerConfigs(null, "a", "b");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+ props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getRestoreConsumerConfigs("client");
+ }
+
static class MisconfiguredSerde implements Serde {
@Override
public void configure(final Map configs, final boolean isKey) {
[4/5] kafka git commit: MINOR: improve JavaDoc for Streams window
retention time
Posted by gu...@apache.org.
MINOR: improve JavaDoc for Streams window retention time
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2068 from mjsax/hotfixImproveWindowRetentionTimeJavaDoc
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34f987f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34f987f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34f987f6
Branch: refs/heads/0.10.1
Commit: 34f987f60a49eb12b67d88fbbac35a2081dd4081
Parents: 2dedd8d
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 26 12:57:57 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:27:54 2016 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kafka/streams/kstream/Windows.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/34f987f6/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index f060d39..5a4f0d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -40,7 +40,8 @@ public abstract class Windows<W extends Window> {
}
/**
- * Set the window maintain duration in milliseconds of system time.
+ * Set the window maintain duration in milliseconds of streams time.
+ * This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
*
* @return itself
*/