You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/06 21:09:52 UTC

[kafka] branch 2.5 updated: KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException (#8233)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new e6d0d6e  KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException (#8233)
e6d0d6e is described below

commit e6d0d6ea3dff80440ffc72fec288debadbd05020
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Fri Mar 6 21:08:52 2020 +0000

    KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException (#8233)
    
    `KafkaStreams.getAllMetadata()` returns `StreamsMetadataState.getAllMetadata()`. All the latter methods is `synchronized` it returns a reference to internal mutable state.  Not only does this break encapsulation, but it means any thread iterating over the returned collection when the metadata gets rebuilt will encounter a `ConcurrentModificationException`.
    
    This change:
     * switches from clearing and rebuild `allMetadata` when `onChange` is called to building a new list and swapping this in. This is thread safe and has the benefit that the returned list is not empty during a rebuild: you either get the old or the new list.
     * removes synchronisation from `getAllMetadata` and `getLocalMetadata`. These are returning member variables. Synchronisation adds nothing.
     * changes `getAllMetadata` to wrap its return value in an unmodifiable wrapper to avoid breaking encapsulation.
     * changes the getters in `StreamsMetadata` to wrap their return values in unmodifiable wrapper to avoid breaking encapsulation.
    
    Co-authored-by: Andy Coates <bi...@users.noreply.github.com>
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/StreamsMetadataState.java  | 21 ++++---
 .../kafka/streams/state/StreamsMetadata.java       |  8 +--
 .../internals/StreamsMetadataStateTest.java        | 26 +++++++++
 .../kafka/streams/state/StreamsMetadataTest.java   | 64 ++++++++++++++++++++++
 4 files changed, 106 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index c746059..154d795 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
@@ -47,9 +46,9 @@ import java.util.Set;
 public class StreamsMetadataState {
     public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
     private final InternalTopologyBuilder builder;
-    private final List<StreamsMetadata> allMetadata = new ArrayList<>();
     private final Set<String> globalStores;
     private final HostInfo thisHost;
+    private List<StreamsMetadata> allMetadata = Collections.emptyList();
     private Cluster clusterMetadata;
     private StreamsMetadata localMetadata;
 
@@ -79,7 +78,7 @@ public class StreamsMetadataState {
      *
      * @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application
      */
-    public synchronized StreamsMetadata getLocalMetadata() {
+    public StreamsMetadata getLocalMetadata() {
         return localMetadata;
     }
 
@@ -89,8 +88,8 @@ public class StreamsMetadataState {
      *
      * @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
      */
-    public synchronized Collection<StreamsMetadata> getAllMetadata() {
-        return allMetadata;
+    public Collection<StreamsMetadata> getAllMetadata() {
+        return Collections.unmodifiableList(allMetadata);
     }
     
     /**
@@ -316,10 +315,12 @@ public class StreamsMetadataState {
 
     private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                  final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
-        allMetadata.clear();
         if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
+            allMetadata = Collections.emptyList();
             return;
         }
+
+        final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
         final Map<String, List<String>> storeToSourceTopics = builder.stateStoreNameToSourceTopics();
         Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream())
             .distinct()
@@ -344,11 +345,13 @@ public class StreamsMetadataState {
                                                                      activePartitionsOnHost,
                                                                      standbyStoresOnHost,
                                                                      standbyPartitionsOnHost);
-                allMetadata.add(metadata);
+                rebuiltMetadata.add(metadata);
                 if (hostInfo.equals(thisHost)) {
                     localMetadata = metadata;
                 }
             });
+
+        allMetadata = rebuiltMetadata;
     }
 
     private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
@@ -409,8 +412,8 @@ public class StreamsMetadataState {
     }
 
     private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
-        final List<String> sourceTopics = builder.sourceTopicsForStore(storeName).stream().collect(Collectors.toList());
-        if (sourceTopics == null || sourceTopics.isEmpty()) {
+        final List<String> sourceTopics = new ArrayList<>(builder.sourceTopicsForStore(storeName));
+        if (sourceTopics.isEmpty()) {
             return null;
         }
         return new SourceTopicsInfo(sourceTopics);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index 50c2d68..b1d3f4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -80,7 +80,7 @@ public class StreamsMetadata {
      * @return set of active state store names
      */
     public Set<String> stateStoreNames() {
-        return stateStoreNames;
+        return Collections.unmodifiableSet(stateStoreNames);
     }
 
     /**
@@ -89,7 +89,7 @@ public class StreamsMetadata {
      * @return set of active topic partitions
      */
     public Set<TopicPartition> topicPartitions() {
-        return topicPartitions;
+        return Collections.unmodifiableSet(topicPartitions);
     }
 
     /**
@@ -98,7 +98,7 @@ public class StreamsMetadata {
      * @return set of standby topic partitions
      */
     public Set<TopicPartition> standbyTopicPartitions() {
-        return standbyTopicPartitions;
+        return Collections.unmodifiableSet(standbyTopicPartitions);
     }
 
     /**
@@ -107,7 +107,7 @@ public class StreamsMetadata {
      * @return set of standby state store names
      */
     public Set<String> standbyStateStoreNames() {
-        return standbyStateStoreNames;
+        return Collections.unmodifiableSet(standbyStateStoreNames);
     }
 
     public String host() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e165622..d38b6c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -35,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,6 +50,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class StreamsMetadataStateTest {
 
@@ -339,4 +341,28 @@ public class StreamsMetadataStateTest {
         streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster);
         assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner));
     }
+
+    @Test
+    public void shouldReturnAllMetadataThatRemainsValidAfterChange() {
+        final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
+        final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
+        assertFalse("invalid test", allMetadata.isEmpty());
+        metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), cluster);
+        assertEquals("encapsulation broken", allMetadata, copy);
+    }
+
+    @Test
+    public void shouldNotReturnMutableReferenceToInternalAllMetadataCollection() {
+        final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
+        assertFalse("invalid test", allMetadata.isEmpty());
+
+        try {
+            // Either this should not affect internal state of 'metadataState'
+            allMetadata.clear();
+        } catch (final UnsupportedOperationException e) {
+            // Or should fail.
+        }
+
+        assertFalse("encapsulation broken", metadataState.getAllMetadata().isEmpty());
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
new file mode 100644
index 0000000..98022bb
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertTrue;
+
+public class StreamsMetadataTest {
+
+    private static final HostInfo HOST_INFO = new HostInfo("local", 12);
+    private static final TopicPartition TP_0 = new TopicPartition("t", 0);
+    private static final TopicPartition TP_1 = new TopicPartition("t", 1);
+
+    private StreamsMetadata streamsMetadata;
+
+    @Before
+    public void setUp() {
+        streamsMetadata = new StreamsMetadata(
+            HOST_INFO,
+            mkSet("store1", "store2"),
+            mkSet(TP_0, TP_1),
+            mkSet("store2"),
+            mkSet(TP_1)
+        );
+    }
+
+    @Test
+    public void shouldNotAllowModificationOfInternalStateViaGetters() {
+        assertTrue(isUnmodifiable(streamsMetadata.stateStoreNames()));
+        assertTrue(isUnmodifiable(streamsMetadata.topicPartitions()));
+        assertTrue(isUnmodifiable(streamsMetadata.standbyTopicPartitions()));
+        assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
+    }
+
+    private static boolean isUnmodifiable(final Collection<?> collection) {
+        try {
+            collection.clear();
+            return false;
+        } catch (final UnsupportedOperationException e) {
+            return true;
+        }
+    }
+}
\ No newline at end of file