You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/23 15:28:36 UTC

[1/5] kafka git commit: HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 3c0713923 -> f91d95ac9


HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2038 from enothereska/hotfix-put-cache


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4fdbc72f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4fdbc72f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4fdbc72f

Branch: refs/heads/0.10.1
Commit: 4fdbc72fe099cbab200cc690f6075c69cbbefb4a
Parents: 3c07139
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Oct 19 14:01:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:15:11 2016 -0800

----------------------------------------------------------------------
 .../streams/state/internals/ThreadCache.java    | 14 ++++++--
 .../state/internals/ThreadCacheTest.java        | 37 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4fdbc72f/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index d76e5c8..f7355d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -123,12 +123,20 @@ public class ThreadCache {
 
     public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) {
         final NamedCache cache = getOrCreateCache(namespace);
-        return cache.putIfAbsent(Bytes.wrap(key), value);
+
+        final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
+        maybeEvict(namespace);
+
+        if (result == null) {
+            numPuts++;
+        }
+        return result;
     }
 
     public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>> entries) {
-        final NamedCache cache = getOrCreateCache(namespace);
-        cache.putAll(entries);
+        for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+            put(namespace, entry.key, entry.value);
+        }
     }
 
     public LRUCacheEntry delete(final String namespace, final byte[] key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fdbc72f/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 2ff3b89..b07da6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -389,6 +389,24 @@ public class ThreadCacheTest {
     }
 
     @Test
+    public void shouldEvictAfterPutAll() throws Exception {
+        final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+        final String namespace = "namespace";
+        final ThreadCache cache = new ThreadCache(1);
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                received.addAll(dirty);
+            }
+        });
+
+        cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})),
+            KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+
+        assertEquals(cache.evicts(), 2);
+    }
+
+    @Test
     public void shouldPutAll() throws Exception {
         final ThreadCache cache = new ThreadCache(100000);
 
@@ -422,6 +440,25 @@ public class ThreadCacheTest {
         assertArrayEquals(value, cache.get("n", key).value);
     }
 
+    @Test
+    public void shouldEvictAfterPutIfAbsent() throws Exception {
+        final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+        final String namespace = "namespace";
+        final ThreadCache cache = new ThreadCache(1);
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                received.addAll(dirty);
+            }
+        });
+
+        cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
+        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+
+        assertEquals(cache.evicts(), 3);
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }


[3/5] kafka git commit: KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

Posted by gu...@apache.org.
KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic

- reworked to use a sinlge KafkaConsumer and subscribe only once

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2049 from mjsax/improveResetTool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2dedd8d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2dedd8d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2dedd8d9

Branch: refs/heads/0.10.1
Commit: 2dedd8d95aa56214f82822c54ff50e0ef9c4f2c8
Parents: 0a24d3a
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 24 13:44:27 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:26:30 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 155 +++++++++----------
 1 file changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2dedd8d9/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7153790..1bb63f7 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
@@ -78,8 +78,8 @@ public class StreamsResetter {
     }
 
     public int run(final String[] args, final Properties config) {
-        this.consumerConfig.clear();
-        this.consumerConfig.putAll(config);
+        consumerConfig.clear();
+        consumerConfig.putAll(config);
 
         int exitCode = EXIT_CODE_SUCCESS;
 
@@ -95,16 +95,15 @@ public class StreamsResetter {
                     "Make sure to stop all running application instances before running the reset tool.");
             }
 
-            zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption),
+            zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
                 30000,
                 30000,
                 JaasUtils.isZkSecurityEnabled());
 
-            this.allTopics.clear();
-            this.allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            allTopics.clear();
+            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
 
-            resetInputAndInternalTopicOffsets();
-            seekToEndIntermediateTopics();
+            resetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
             deleteInternalTopics(zkUtils);
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
@@ -150,111 +149,105 @@ public class StreamsResetter {
             .describedAs("list");
 
         try {
-            this.options = optionParser.parse(args);
+            options = optionParser.parse(args);
         } catch (final OptionException e) {
             optionParser.printHelpOn(System.err);
             throw e;
         }
     }
 
-    private void resetInputAndInternalTopicOffsets() {
-        final List<String> inputTopics = this.options.valuesOf(inputTopicsOption);
+    private void resetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
+        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
 
-        if (inputTopics.size() == 0) {
-            System.out.println("No input topics specified.");
+        if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
+            System.out.println("No input or intermediate topics specified. Skipping seek.");
+            return;
         } else {
-            System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+            if (inputTopics.size() != 0) {
+                System.out.println("Resetting offsets to zero for input topics " + inputTopics + " and all internal topics.");
+            }
+            if (intermediateTopics.size() != 0) {
+                System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
+            }
         }
 
         final Properties config = new Properties();
-        config.putAll(this.consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
+        config.putAll(consumerConfig);
+        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption));
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(applicationIdOption));
         config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
-        for (final String inTopic : inputTopics) {
-            if (!this.allTopics.contains(inTopic)) {
-                System.out.println("Input topic " + inTopic + " not found. Skipping.");
+        final Set<String> topicsToSubscribe = new HashSet<>(inputTopics.size() + intermediateTopics.size());
+        for (final String topic : inputTopics) {
+            if (!allTopics.contains(topic)) {
+                System.err.println("Input topic " + topic + " not found. Skipping.");
+            } else {
+                topicsToSubscribe.add(topic);
+            }
+        }
+        for (final String topic : intermediateTopics) {
+            if (!allTopics.contains(topic)) {
+                System.err.println("Intermediate topic " + topic + " not found. Skipping.");
+            } else {
+                topicsToSubscribe.add(topic);
+            }
+        }
+        for (final String topic : allTopics) {
+            if (isInternalTopic(topic)) {
+                topicsToSubscribe.add(topic);
             }
         }
 
-        for (final String topic : this.allTopics) {
-            if (isInputTopic(topic) || isInternalTopic(topic)) {
-                System.out.println("Topic: " + topic);
+        try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+            client.subscribe(topicsToSubscribe);
+            client.poll(1);
+
+            final Set<TopicPartition> partitions = client.assignment();
+            final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>();
+            final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
+
+            for (final TopicPartition p : partitions) {
+                final String topic = p.topic();
+                if (isInputTopic(topic) || isInternalTopic(topic)) {
+                    inputAndInternalTopicPartitions.add(p);
+                } else if (isIntermediateTopic(topic)) {
+                    intermediateTopicPartitions.add(p);
+                } else {
+                    System.err.println("Skipping invalid partition: " + p);
+                }
+            }
 
-                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-                    client.subscribe(Collections.singleton(topic));
-                    client.poll(1);
+            client.seekToBeginning(inputAndInternalTopicPartitions);
+            client.seekToEnd(intermediateTopicPartitions);
 
-                    final Set<TopicPartition> partitions = client.assignment();
-                    client.seekToBeginning(partitions);
-                    for (final TopicPartition p : partitions) {
-                        client.position(p);
-                    }
-                    client.commitSync();
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Resetting offsets for topic " + topic + " failed.");
-                    throw e;
-                }
+            for (final TopicPartition p : partitions) {
+                client.position(p);
             }
+            client.commitSync();
+        } catch (final RuntimeException e) {
+            System.err.println("ERROR: Resetting offsets failed.");
+            throw e;
         }
 
         System.out.println("Done.");
     }
 
     private boolean isInputTopic(final String topic) {
-        return this.options.valuesOf(inputTopicsOption).contains(topic);
+        return options.valuesOf(inputTopicsOption).contains(topic);
     }
 
-    private void seekToEndIntermediateTopics() {
-        final List<String> intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
-
-        if (intermediateTopics.size() == 0) {
-            System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
-            return;
-        }
-
-        System.out.println("Seek-to-end for intermediate user topics " + intermediateTopics);
-
-        final Properties config = new Properties();
-        config.putAll(this.consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.options.valueOf(bootstrapServerOption));
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, this.options.valueOf(applicationIdOption));
-        config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        for (final String topic : intermediateTopics) {
-            if (this.allTopics.contains(topic)) {
-                System.out.println("Topic: " + topic);
-
-                try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-                    client.subscribe(Collections.singleton(topic));
-                    client.poll(1);
-
-                    final Set<TopicPartition> partitions = client.assignment();
-                    client.seekToEnd(partitions);
-                    for (final TopicPartition p : partitions) {
-                        client.position(p);
-                    }
-                    client.commitSync();
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Seek-to-end for topic " + topic + " failed.");
-                    throw e;
-                }
-            } else {
-                System.out.println("Topic " + topic + " not found. Skipping.");
-            }
-        }
-
-        System.out.println("Done.");
+    private boolean isIntermediateTopic(final String topic) {
+        return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
     private void deleteInternalTopics(final ZkUtils zkUtils) {
-        System.out.println("Deleting all internal/auto-created topics for application " + this.options.valueOf(applicationIdOption));
+        System.out.println("Deleting all internal/auto-created topics for application " + options.valueOf(applicationIdOption));
 
-        for (final String topic : this.allTopics) {
+        for (final String topic : allTopics) {
             if (isInternalTopic(topic)) {
                 final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new String[]{
-                    "--zookeeper", this.options.valueOf(zookeeperOption),
+                    "--zookeeper", options.valueOf(zookeeperOption),
                     "--delete", "--topic", topic});
                 try {
                     TopicCommand.deleteTopic(zkUtils, commandOptions);
@@ -269,7 +262,7 @@ public class StreamsResetter {
     }
 
     private boolean isInternalTopic(final String topicName) {
-        return topicName.startsWith(this.options.valueOf(applicationIdOption) + "-")
+        return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
             && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));
     }
 


[2/5] kafka git commit: HOTFIX: follow up on KAFKA-4275

Posted by gu...@apache.org.
HOTFIX: follow up on KAFKA-4275

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2039 from mjsax/hotfix-ktableLeftJoin


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a24d3a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a24d3a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a24d3a2

Branch: refs/heads/0.10.1
Commit: 0a24d3a258a544314d8ea995157a351f9ca8994e
Parents: 4fdbc72
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 19 20:58:18 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:18:15 2016 -0800

----------------------------------------------------------------------
 .../streams/kstream/internals/KTableImpl.java   |  14 +-
 .../KTableKTableJoinIntegrationTest.java        | 280 +++++++++++++++++++
 2 files changed, 287 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0a24d3a2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7ce0bbb..06feb0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -21,10 +21,10 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -297,8 +297,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
-        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
     }
@@ -325,8 +325,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
-        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
     }
@@ -352,8 +352,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         topology.addProcessor(joinThisName, joinThis, this.name);
         topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
         topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
-        topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
-        topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
+        topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames());
+        topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames());
 
         return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a24d3a2/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
new file mode 100644
index 0000000..85e2cf7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+@RunWith(Parameterized.class)
+public class KTableKTableJoinIntegrationTest {
+    private final static int NUM_BROKERS = 1;
+
+    @ClassRule
+    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final static MockTime MOCK_TIME = CLUSTER.time;
+    private final static String TABLE_1 = "table1";
+    private final static String TABLE_2 = "table2";
+    private final static String TABLE_3 = "table3";
+    private final static String OUTPUT = "output-";
+    private static Properties streamsConfig;
+    private KafkaStreams streams;
+    private final static Properties CONSUMER_CONFIG = new Properties();
+
+    @Parameterized.Parameter(value = 0)
+    public JoinType joinType1;
+    @Parameterized.Parameter(value = 1)
+    public JoinType joinType2;
+    @Parameterized.Parameter(value = 2)
+    public List<KeyValue<String, String>> expectedResult;
+
+    //Single parameter, use Object[]
+    @Parameterized.Parameters
+    public static Object[] parameters() {
+        return new Object[][]{
+            {JoinType.INNER, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null))
+            },
+            {JoinType.INNER, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null)
+            )},
+            {JoinType.INNER, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C3")
+            )},
+            {JoinType.LEFT, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null)
+            )},
+            {JoinType.LEFT, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", null)
+            )},
+            {JoinType.LEFT, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C3")
+            )},
+            {JoinType.OUTER, JoinType.INNER, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")
+            )},
+            {JoinType.OUTER, JoinType.LEFT, Arrays.asList(
+                new KeyValue<>("a", null),
+                new KeyValue<>("b", null),
+                new KeyValue<>("c", null),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")
+            )},
+            {JoinType.OUTER, JoinType.OUTER, Arrays.asList(
+                new KeyValue<>("a", "null-A3"),
+                new KeyValue<>("b", "null-B3"),
+                new KeyValue<>("c", "null-C3"),
+                new KeyValue<>("a", "A1-null-A3"),
+                new KeyValue<>("b", "B1-null-B3"),
+                new KeyValue<>("b", "B1-B2-B3"),
+                new KeyValue<>("c", "null-C2-C3")
+            )}
+        };
+    }
+
+    public static Object[] data() {
+        return new Object[]{0, 10 * 1024 * 1024L};
+    }
+
+    @BeforeClass
+    public static void beforeTest() throws Exception {
+        CLUSTER.createTopic(TABLE_1);
+        CLUSTER.createTopic(TABLE_2);
+        CLUSTER.createTopic(TABLE_3);
+        CLUSTER.createTopic(OUTPUT);
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfig.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+        final Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        final List<KeyValue<String, String>> table1 = Arrays.asList(
+            new KeyValue<>("a", "A1"),
+            new KeyValue<>("b", "B1")
+        );
+
+        final List<KeyValue<String, String>> table2 = Arrays.asList(
+            new KeyValue<>("b", "B2"),
+            new KeyValue<>("c", "C2")
+        );
+
+        final List<KeyValue<String, String>> table3 = Arrays.asList(
+            new KeyValue<>("a", "A3"),
+            new KeyValue<>("b", "B3"),
+            new KeyValue<>("c", "C3")
+        );
+
+        // put table 3 first, to make sure data is there when joining T1 with T2
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig, MOCK_TIME);
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME);
+        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME);
+
+
+        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
+        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    }
+
+    @Before
+    public void before() throws Exception {
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close();
+            streams = null;
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
+    }
+
+    private enum JoinType {
+        INNER, LEFT, OUTER
+    }
+
+    private KafkaStreams prepareTopology() {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
+        final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
+        final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+
+        join(join(table1, table2, joinType1), table3, joinType2).to(OUTPUT);
+
+        return new KafkaStreams(builder, new StreamsConfig(streamsConfig));
+    }
+
+    private KTable<String, String> join(KTable<String, String> first, KTable<String, String> second, JoinType joinType) {
+        final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(final String value1, final String value2) {
+                return value1 + "-" + value2;
+            }
+        };
+
+        switch (joinType) {
+            case INNER:
+                return first.join(second, joiner);
+            case LEFT:
+                return first.leftJoin(second, joiner);
+            case OUTER:
+                return first.outerJoin(second, joiner);
+        }
+
+        throw new RuntimeException("Unknown join type.");
+    }
+
+    @Test
+    public void KTableKTableJoin() throws Exception {
+        System.out.println("join: " + joinType1 + "-" + joinType2);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join");
+
+        streams = prepareTopology();
+        streams.start();
+
+
+        final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            CONSUMER_CONFIG,
+            OUTPUT,
+            expectedResult.size());
+
+        assertThat(result, equalTo(expectedResult));
+    }
+
+}


[5/5] kafka git commit: KAFKA-4361: Streams does not respect user configs for "default" params

Posted by gu...@apache.org.
KAFKA-4361: Streams does not respect user configs for "default" params

Enable user provided consumer and producer configs to override the streams default configs.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2084 from dguy/kafka-4361


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f91d95ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f91d95ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f91d95ac

Branch: refs/heads/0.10.1
Commit: f91d95ac9aa6c5fe9b7ee091b7759fee53e465d4
Parents: 34f987f
Author: Damian Guy <da...@gmail.com>
Authored: Tue Nov 1 10:07:58 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:28:24 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 21 ++++++----
 .../apache/kafka/streams/StreamsConfigTest.java | 43 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f91d95ac/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 66c15b9..5ba4383 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -321,16 +321,19 @@ public class StreamsConfig extends AbstractConfig {
      * @throws ConfigException
      */
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException {
-        final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+
+        final Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+        final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
-        if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+        if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                     + ", as the streams client will always turn off auto committing.");
         }
 
-        consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -362,16 +365,18 @@ public class StreamsConfig extends AbstractConfig {
      * @throws ConfigException
      */
     public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException {
-        Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+        Map<String, Object> consumerProps = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
+
+        final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
 
         // disable auto commit and throw exception if there is user overridden values,
         // this is necessary for streams commit semantics
-        if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+        if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
             throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
                     + ", as the streams client will always turn off auto committing.");
         }
 
-        consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -396,8 +401,8 @@ public class StreamsConfig extends AbstractConfig {
      */
     public Map<String, Object> getProducerConfigs(String clientId) {
         // generate producer configs from original properties and overridden maps
-        final Map<String, Object> props = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
-        props.putAll(PRODUCER_DEFAULT_OVERRIDES);
+        final Map<String, Object> props = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
+        props.putAll(getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames()));
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
         // add client id with stream client id prefix

http://git-wip-us.apache.org/repos/asf/kafka/blob/f91d95ac/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3caa767..f03bed9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -214,6 +215,48 @@ public class StreamsConfigTest {
         streamsConfig.valueSerde();
     }
 
+    @Test
+    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
+    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+        props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
+        assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
+    }
+
+    @Test
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+        assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getConsumerConfigs(null, "a", "b");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+        props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getRestoreConsumerConfigs("client");
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {


[4/5] kafka git commit: MINOR: improve JavaDoc for Streams window retention time

Posted by gu...@apache.org.
MINOR: improve JavaDoc for Streams window retention time

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2068 from mjsax/hotfixImproveWindowRetentionTimeJavaDoc


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34f987f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34f987f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34f987f6

Branch: refs/heads/0.10.1
Commit: 34f987f60a49eb12b67d88fbbac35a2081dd4081
Parents: 2dedd8d
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 26 12:57:57 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 07:27:54 2016 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kafka/streams/kstream/Windows.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/34f987f6/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index f060d39..5a4f0d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -40,7 +40,8 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
-     * Set the window maintain duration in milliseconds of system time.
+     * Set the window maintain duration in milliseconds of streams time.
+     * This retention time is a guaranteed <i>lower bound</i> for how long a window will be maintained.
      *
      * @return  itself
      */