You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/20 00:47:45 UTC

[1/2] kafka git commit: Cherry-pick KAFKA-3941: Delay eviction listener in InMemoryKeyValueLoggedStore after restoration

Repository: kafka
Updated Branches:
  refs/heads/0.10.0 0bb1d3ae5 -> a7f9396b0


Cherry-pick KAFKA-3941: Delay eviction listener in InMemoryKeyValueLoggedStore after restoration


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7bd06e92
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7bd06e92
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7bd06e92

Branch: refs/heads/0.10.0
Commit: 7bd06e9255544c3b9c536b6b9417e4bb52c830b3
Parents: 4091a13
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jul 13 18:11:25 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 19 17:46:50 2016 -0700

----------------------------------------------------------------------
 .../internals/InMemoryKeyValueLoggedStore.java  | 27 ++++++++-------
 .../InMemoryKeyValueStoreSupplier.java          | 26 +++++++++++++--
 .../InMemoryLRUCacheStoreSupplier.java          | 15 +++------
 .../streams/state/internals/MemoryLRUCache.java | 35 +++++++++++++++++---
 .../internals/MemoryNavigableLRUCache.java      | 10 +++---
 .../internals/RocksDBKeyValueStoreSupplier.java |  4 ++-
 .../streams/state/internals/RocksDBStore.java   |  2 +-
 .../internals/RocksDBWindowStoreSupplier.java   |  4 ++-
 .../processor/internals/StandbyTaskTest.java    |  4 ---
 9 files changed, 84 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index efcdac7..2c45d1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,7 +34,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     private final Serde<V> valueSerde;
     private final String storeName;
 
-    private StateSerdes<K, V> serdes;
     private StoreChangeLogger<K, V> changeLogger;
     private StoreChangeLogger.ValueGetter<K, V> getter;
 
@@ -54,30 +52,31 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
+        inner.init(context, root);
+
         // construct the serde
-        this.serdes = new StateSerdes<>(storeName,
+        StateSerdes<K, V>  serdes = new StateSerdes<>(storeName,
                 keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                 valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
-        context.register(root, true, new StateRestoreCallback() {
-            @Override
-            public void restore(byte[] key, byte[] value) {
-
-                // directly call inner functions so that the operation is not logged
-                inner.put(serdes.keyFrom(key), serdes.valueFrom(value));
-            }
-        });
-
-        inner.init(context, root);
-
         this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
             @Override
             public V get(K key) {
                 return inner.get(key);
             }
         };
+
+        // if the inner store is an LRU cache, add the eviction listener to log removed record
+        if (inner instanceof MemoryLRUCache) {
+            ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
+                @Override
+                public void apply(K key, V value) {
+                    removed(key);
+                }
+            });
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index a25153c..8b498a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.Iterator;
 import java.util.List;
@@ -67,7 +69,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new MemoryStore<>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
+        MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde);
+
+        return new MeteredKeyValueStore<>(store.enableLogging(), "in-memory-state", time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -76,6 +80,8 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         private final Serde<V> valueSerde;
         private final NavigableMap<K, V> map;
 
+        private StateSerdes<K, V> serdes;
+
         public MemoryStore(String name, Serde<K> keySerde, Serde<V> valueSerde) {
             this.name = name;
             this.keySerde = keySerde;
@@ -98,7 +104,23 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         @Override
         @SuppressWarnings("unchecked")
         public void init(ProcessorContext context, StateStore root) {
-            // do nothing
+            // construct the serde
+            this.serdes = new StateSerdes<>(name,
+                    keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                    valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+            // register the store
+            context.register(root, true, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    // check value for null, to avoid  deserialization error.
+                    if (value == null) {
+                        put(serdes.keyFrom(key), null);
+                    } else {
+                        put(serdes.keyFrom(key), serdes.valueFrom(value));
+                    }
+                }
+            });
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 4a4fa5f..20a7333 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -52,17 +52,10 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
         return name;
     }
 
-    @SuppressWarnings("unchecked")
     public StateStore get() {
-        final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity);
-        final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(keySerde, valueSerde);
-        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
-        cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
-            @Override
-            public void apply(K key, V value) {
-                loggedCache.removed(key);
-            }
-        });
-        return store;
+        MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name, capacity, keySerde, valueSerde);
+        InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore<K, V>) cache.enableLogging();
+
+        return new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index d410e02..1899389 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -48,16 +50,25 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         void apply(K key, V value);
     }
 
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+
     protected String name;
     protected Map<K, V> map;
     protected Set<K> keys;
+    private StateSerdes<K, V> serdes;
 
     protected EldestEntryRemovalListener<K, V> listener;
 
     // this is used for extended MemoryNavigableLRUCache only
-    public MemoryLRUCache() {}
+    public MemoryLRUCache(Serde<K> keySerde, Serde<V> valueSerde) {
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
+        this(keySerde, valueSerde);
 
-    public MemoryLRUCache(String name, final int maxCacheSize) {
         this.name = name;
         this.keys = new HashSet<>();
 
@@ -78,7 +89,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         };
     }
 
-    public KeyValueStore<K, V> enableLogging(Serde<K> keySerde, Serde<V> valueSerde) {
+    public KeyValueStore<K, V> enableLogging() {
         return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
     }
 
@@ -96,7 +107,23 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
-        // do nothing
+        // construct the serde
+        this.serdes = new StateSerdes<>(name,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+        // register the store
+        context.register(root, true, new StateRestoreCallback() {
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                // check value for null, to avoid  deserialization error.
+                if (value == null) {
+                    put(serdes.keyFrom(key), null);
+                } else {
+                    put(serdes.keyFrom(key), serdes.valueFrom(value));
+                }
+            }
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 99bac93..5eb4f49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
@@ -27,8 +28,8 @@ import java.util.TreeSet;
 
 public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
 
-    public MemoryNavigableLRUCache(String name, final int maxCacheSize) {
-        super();
+    public MemoryNavigableLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V> valueSerde) {
+        super(keySerde, valueSerde);
 
         this.name = name;
         this.keys = new TreeSet<>();
@@ -57,15 +58,14 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
         return this;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
-        return new MemoryNavigableLRUCache.CacheIterator<K, V>(((NavigableSet) this.keys).subSet(from, true, to, false).iterator(), this.map);
+        return new MemoryNavigableLRUCache.CacheIterator<>(((NavigableSet<K>) this.keys).subSet(from, true, to, false).iterator(), this.map);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MemoryNavigableLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+        return new MemoryNavigableLRUCache.CacheIterator<>(this.keys.iterator(), this.map);
     }
 
     private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index af98733..16111ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -53,6 +53,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde, valueSerde).enableLogging(), "rocksdb-state", time);
+        RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde);
+
+        return new MeteredKeyValueStore<>(store.enableLogging(), "rocksdb-state", time);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8f3bab0..73110f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -173,7 +173,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
 
         if (this.cacheSize > 0) {
-            this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
+            this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize, null, null)
                     .whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>() {
                         @Override
                         public void apply(K key, RocksDBCacheEntry entry) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 0407299..3a1bd59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -59,7 +59,9 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde).enableLogging(), "rocksdb-window", time);
+        RocksDBWindowStore<K, V> store = new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
+
+        return new MeteredWindowStore<>(store.enableLogging(), "rocksdb-window", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7bd06e92/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e7fb9a4..9e15e1c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -115,15 +115,11 @@ public class StandbyTaskTest {
                 new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])
         ));
 
-        System.out.println("added " + storeChangelogTopicName1);
-
         restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList(
                 new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]),
                 new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]),
                 new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
         ));
-
-        System.out.println("added " + storeChangelogTopicName2);
     }
 
     @Test


[2/2] kafka git commit: Merge branch '0.10.0' of https://git-wip-us.apache.org/repos/asf/kafka into 0.10.0

Posted by gu...@apache.org.
Merge branch '0.10.0' of https://git-wip-us.apache.org/repos/asf/kafka into 0.10.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7f9396b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7f9396b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7f9396b

Branch: refs/heads/0.10.0
Commit: a7f9396b07a8be2e2e7d43a8ac31f39c66a0f8da
Parents: 7bd06e9 0bb1d3a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Jul 19 17:47:17 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 19 17:47:17 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |  4 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  1 +
 .../consumer/internals/ConsumerCoordinator.java | 12 ++-
 .../consumer/internals/SubscriptionState.java   | 41 +++++++---
 .../kafka/clients/producer/ProducerConfig.java  | 16 ++--
 .../main/scala/kafka/log/FileMessageSet.scala   | 20 +++--
 .../kafka/tools/VerifyConsumerRebalance.scala   |  3 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 83 +++++++++++++++++++-
 .../unit/kafka/log/FileMessageSetTest.scala     | 67 +++++++++++++++-
 docs/ops.html                                   | 22 ++++--
 docs/quickstart.html                            | 29 +++++--
 .../apache/kafka/streams/kstream/Windows.java   |  4 +-
 .../kstream/internals/AbstractStream.java       |  6 ++
 .../streams/kstream/internals/KTableFilter.java |  3 +
 .../kstream/internals/KTableRepartitionMap.java |  5 +-
 .../streams/processor/TopologyBuilder.java      | 38 ++++-----
 .../processor/internals/RecordCollector.java    | 29 ++++---
 .../streams/state/internals/RocksDBStore.java   | 21 +++--
 .../kstream/internals/KTableFilterTest.java     | 39 ++++++++-
 .../streams/smoketest/SmokeTestDriver.java      |  4 +-
 .../kafka/streams/smoketest/SmokeTestUtil.java  |  2 +-
 .../kafka/test/MockProcessorSupplier.java       |  6 ++
 tests/kafkatest/services/kafka/kafka.py         | 13 +--
 .../services/kafka/templates/kafka.properties   |  4 +-
 tests/kafkatest/services/security/minikdc.py    |  2 +-
 .../services/security/security_config.py        | 40 ++++++----
 tests/kafkatest/services/zookeeper.py           |  3 +-
 .../core/zookeeper_security_upgrade_test.py     |  2 +-
 28 files changed, 400 insertions(+), 119 deletions(-)
----------------------------------------------------------------------