You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/06 21:09:52 UTC
[kafka] branch 2.5 updated: KAFKA-9668: Iterating over
KafkaStreams.getAllMetadata() results in ConcurrentModificationException
(#8233)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new e6d0d6e KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException (#8233)
e6d0d6e is described below
commit e6d0d6ea3dff80440ffc72fec288debadbd05020
Author: Andy Coates <80...@users.noreply.github.com>
AuthorDate: Fri Mar 6 21:08:52 2020 +0000
KAFKA-9668: Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException (#8233)
`KafkaStreams.getAllMetadata()` returns `StreamsMetadataState.getAllMetadata()`. All the latter methods is `synchronized` it returns a reference to internal mutable state. Not only does this break encapsulation, but it means any thread iterating over the returned collection when the metadata gets rebuilt will encounter a `ConcurrentModificationException`.
This change:
* switches from clearing and rebuild `allMetadata` when `onChange` is called to building a new list and swapping this in. This is thread safe and has the benefit that the returned list is not empty during a rebuild: you either get the old or the new list.
* removes synchronisation from `getAllMetadata` and `getLocalMetadata`. These are returning member variables. Synchronisation adds nothing.
* changes `getAllMetadata` to wrap its return value in an unmodifiable wrapper to avoid breaking encapsulation.
* changes the getters in `StreamsMetadata` to wrap their return values in unmodifiable wrapper to avoid breaking encapsulation.
Co-authored-by: Andy Coates <bi...@users.noreply.github.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/StreamsMetadataState.java | 21 ++++---
.../kafka/streams/state/StreamsMetadata.java | 8 +--
.../internals/StreamsMetadataStateTest.java | 26 +++++++++
.../kafka/streams/state/StreamsMetadataTest.java | 64 ++++++++++++++++++++++
4 files changed, 106 insertions(+), 13 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index c746059..154d795 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
@@ -47,9 +46,9 @@ import java.util.Set;
public class StreamsMetadataState {
public static final HostInfo UNKNOWN_HOST = new HostInfo("unknown", -1);
private final InternalTopologyBuilder builder;
- private final List<StreamsMetadata> allMetadata = new ArrayList<>();
private final Set<String> globalStores;
private final HostInfo thisHost;
+ private List<StreamsMetadata> allMetadata = Collections.emptyList();
private Cluster clusterMetadata;
private StreamsMetadata localMetadata;
@@ -79,7 +78,7 @@ public class StreamsMetadataState {
*
* @return the {@link StreamsMetadata}s for the local instance in a {@link KafkaStreams} application
*/
- public synchronized StreamsMetadata getLocalMetadata() {
+ public StreamsMetadata getLocalMetadata() {
return localMetadata;
}
@@ -89,8 +88,8 @@ public class StreamsMetadataState {
*
* @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
*/
- public synchronized Collection<StreamsMetadata> getAllMetadata() {
- return allMetadata;
+ public Collection<StreamsMetadata> getAllMetadata() {
+ return Collections.unmodifiableList(allMetadata);
}
/**
@@ -316,10 +315,12 @@ public class StreamsMetadataState {
private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap) {
- allMetadata.clear();
if (activePartitionHostMap.isEmpty() && standbyPartitionHostMap.isEmpty()) {
+ allMetadata = Collections.emptyList();
return;
}
+
+ final List<StreamsMetadata> rebuiltMetadata = new ArrayList<>();
final Map<String, List<String>> storeToSourceTopics = builder.stateStoreNameToSourceTopics();
Stream.concat(activePartitionHostMap.keySet().stream(), standbyPartitionHostMap.keySet().stream())
.distinct()
@@ -344,11 +345,13 @@ public class StreamsMetadataState {
activePartitionsOnHost,
standbyStoresOnHost,
standbyPartitionsOnHost);
- allMetadata.add(metadata);
+ rebuiltMetadata.add(metadata);
if (hostInfo.equals(thisHost)) {
localMetadata = metadata;
}
});
+
+ allMetadata = rebuiltMetadata;
}
private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
@@ -409,8 +412,8 @@ public class StreamsMetadataState {
}
private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
- final List<String> sourceTopics = builder.sourceTopicsForStore(storeName).stream().collect(Collectors.toList());
- if (sourceTopics == null || sourceTopics.isEmpty()) {
+ final List<String> sourceTopics = new ArrayList<>(builder.sourceTopicsForStore(storeName));
+ if (sourceTopics.isEmpty()) {
return null;
}
return new SourceTopicsInfo(sourceTopics);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
index 50c2d68..b1d3f4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StreamsMetadata.java
@@ -80,7 +80,7 @@ public class StreamsMetadata {
* @return set of active state store names
*/
public Set<String> stateStoreNames() {
- return stateStoreNames;
+ return Collections.unmodifiableSet(stateStoreNames);
}
/**
@@ -89,7 +89,7 @@ public class StreamsMetadata {
* @return set of active topic partitions
*/
public Set<TopicPartition> topicPartitions() {
- return topicPartitions;
+ return Collections.unmodifiableSet(topicPartitions);
}
/**
@@ -98,7 +98,7 @@ public class StreamsMetadata {
* @return set of standby topic partitions
*/
public Set<TopicPartition> standbyTopicPartitions() {
- return standbyTopicPartitions;
+ return Collections.unmodifiableSet(standbyTopicPartitions);
}
/**
@@ -107,7 +107,7 @@ public class StreamsMetadata {
* @return set of standby state store names
*/
public Set<String> standbyStateStoreNames() {
- return standbyStateStoreNames;
+ return Collections.unmodifiableSet(standbyStateStoreNames);
}
public String host() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e165622..d38b6c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -35,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -49,6 +50,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class StreamsMetadataStateTest {
@@ -339,4 +341,28 @@ public class StreamsMetadataStateTest {
streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, cluster);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner));
}
+
+ @Test
+ public void shouldReturnAllMetadataThatRemainsValidAfterChange() {
+ final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
+ final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
+ assertFalse("invalid test", allMetadata.isEmpty());
+ metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), cluster);
+ assertEquals("encapsulation broken", allMetadata, copy);
+ }
+
+ @Test
+ public void shouldNotReturnMutableReferenceToInternalAllMetadataCollection() {
+ final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
+ assertFalse("invalid test", allMetadata.isEmpty());
+
+ try {
+ // Either this should not affect internal state of 'metadataState'
+ allMetadata.clear();
+ } catch (final UnsupportedOperationException e) {
+ // Or should fail.
+ }
+
+ assertFalse("encapsulation broken", metadataState.getAllMetadata().isEmpty());
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
new file mode 100644
index 0000000..98022bb
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StreamsMetadataTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.Assert.assertTrue;
+
+public class StreamsMetadataTest {
+
+ private static final HostInfo HOST_INFO = new HostInfo("local", 12);
+ private static final TopicPartition TP_0 = new TopicPartition("t", 0);
+ private static final TopicPartition TP_1 = new TopicPartition("t", 1);
+
+ private StreamsMetadata streamsMetadata;
+
+ @Before
+ public void setUp() {
+ streamsMetadata = new StreamsMetadata(
+ HOST_INFO,
+ mkSet("store1", "store2"),
+ mkSet(TP_0, TP_1),
+ mkSet("store2"),
+ mkSet(TP_1)
+ );
+ }
+
+ @Test
+ public void shouldNotAllowModificationOfInternalStateViaGetters() {
+ assertTrue(isUnmodifiable(streamsMetadata.stateStoreNames()));
+ assertTrue(isUnmodifiable(streamsMetadata.topicPartitions()));
+ assertTrue(isUnmodifiable(streamsMetadata.standbyTopicPartitions()));
+ assertTrue(isUnmodifiable(streamsMetadata.standbyStateStoreNames()));
+ }
+
+ private static boolean isUnmodifiable(final Collection<?> collection) {
+ try {
+ collection.clear();
+ return false;
+ } catch (final UnsupportedOperationException e) {
+ return true;
+ }
+ }
+}
\ No newline at end of file