You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/15 23:16:15 UTC
[kafka] 02/02: Removing Multicasting partitioner for IQ (#12977)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2e1d8139a9b450ea0f8787de94577f391ac10dd9
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Fri Dec 16 04:39:41 2022 +0530
Removing Multicasting partitioner for IQ (#12977)
Follow up PR for KIP-837. We don't want to allow multicasting for IQ. This PR imposes that restriction.
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../org/apache/kafka/streams/KeyQueryMetadata.java | 24 +-----
.../processor/internals/StreamsMetadataState.java | 45 +++++------
.../integration/StoreQueryIntegrationTest.java | 87 +++++++++-------------
.../internals/StreamsMetadataStateTest.java | 25 +++----
4 files changed, 66 insertions(+), 115 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
index 6461ee7423f..9ca495214d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
@@ -43,20 +43,10 @@ public class KeyQueryMetadata {
private final int partition;
- private final Set<Integer> partitions;
-
public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final int partition) {
this.activeHost = activeHost;
this.standbyHosts = standbyHosts;
this.partition = partition;
- this.partitions = Collections.singleton(partition);
- }
-
- public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final Set<Integer> partitions) {
- this.activeHost = activeHost;
- this.standbyHosts = standbyHosts;
- this.partition = partitions.size() == 1 ? partitions.iterator().next() : -1;
- this.partitions = partitions;
}
/**
@@ -119,16 +109,6 @@ public class KeyQueryMetadata {
return partition;
}
- /**
- * Get the store partitions corresponding to the key.
- * A Key can be on multiple partitions if it has been
- * multicasted using StreamPartitioner#partitions method
- * @return store partition number
- */
- public Set<Integer> partitions() {
- return partitions;
- }
-
@Override
public boolean equals(final Object obj) {
if (!(obj instanceof KeyQueryMetadata)) {
@@ -137,8 +117,7 @@ public class KeyQueryMetadata {
final KeyQueryMetadata keyQueryMetadata = (KeyQueryMetadata) obj;
return Objects.equals(keyQueryMetadata.activeHost, activeHost)
&& Objects.equals(keyQueryMetadata.standbyHosts, standbyHosts)
- && (Objects.equals(keyQueryMetadata.partition, partition)
- || Objects.equals(keyQueryMetadata.partitions, partitions));
+ && Objects.equals(keyQueryMetadata.partition, partition);
}
@Override
@@ -147,7 +126,6 @@ public class KeyQueryMetadata {
"activeHost=" + activeHost +
", standbyHosts=" + standbyHosts +
", partition=" + partition +
- ", partitions=" + partitions +
'}';
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 7217666bcf5..49fa34bc510 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
@@ -461,25 +463,25 @@ public class StreamsMetadataState {
return rebuiltMetadata;
}
+ private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> {
+ if (!maybeMulticastPartitions.isPresent()) {
+ return null;
+ }
+ if (maybeMulticastPartitions.get().size() != 1) {
+ throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for key should be a singleton set");
+ }
+ return maybeMulticastPartitions.get().iterator().next();
+ };
+
private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo) {
- // Making an assumption that the partitions method won't return an empty Optional set
- // which means it is not intended to use the default partitioner. It is an optimistic
- // assumption, but the older implementation with partition() also made the same assumption.
- final Set<Integer> partitions = partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions).get();
- // The record was dropped and hence won't be found anywhere
- if (partitions.isEmpty()) {
- return new KeyQueryMetadata(UNKNOWN_HOST, Collections.emptySet(), UNKNOWN_PARTITION);
- }
-
+ final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
final Set<TopicPartition> matchingPartitions = new HashSet<>();
for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
- for (final Integer partition : partitions) {
- matchingPartitions.add(new TopicPartition(sourceTopic, partition));
- }
+ matchingPartitions.add(new TopicPartition(sourceTopic, partition));
}
HostInfo activeHost = UNKNOWN_HOST;
@@ -501,7 +503,7 @@ public class StreamsMetadataState {
}
}
- return new KeyQueryMetadata(activeHost, standbyHosts, partitions);
+ return new KeyQueryMetadata(activeHost, standbyHosts, partition);
}
private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
@@ -510,21 +512,10 @@ public class StreamsMetadataState {
final SourceTopicsInfo sourceTopicsInfo,
final String topologyName) {
Objects.requireNonNull(topologyName, "topology name must not be null");
-
- // Making an assumption that the partitions method won't return an empty Optional set
- // which means it is not intended to use the default partitioner. It is an optimistic
- // assumption, but the older implementation with partition() also made the same assumption.
- final Set<Integer> partitions = partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions).get();
- // The record was dropped and hence won't be found anywhere
- if (partitions.isEmpty()) {
- return new KeyQueryMetadata(UNKNOWN_HOST, Collections.emptySet(), UNKNOWN_PARTITION);
- }
-
+ final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
final Set<TopicPartition> matchingPartitions = new HashSet<>();
for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
- for (final Integer partition : partitions) {
- matchingPartitions.add(new TopicPartition(sourceTopic, partition));
- }
+ matchingPartitions.add(new TopicPartition(sourceTopic, partition));
}
HostInfo activeHost = UNKNOWN_HOST;
@@ -550,7 +541,7 @@ public class StreamsMetadataState {
}
}
- return new KeyQueryMetadata(activeHost, standbyHosts, partitions);
+ return new KeyQueryMetadata(activeHost, standbyHosts, partition);
}
private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 44e690b3000..260d6992f29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -71,7 +70,6 @@ import java.util.Set;
import java.util.Collections;
import static java.util.Collections.singletonList;
-import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
@@ -119,57 +117,10 @@ public class StoreQueryIntegrationTest {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
}
+ streamsToCleanup.clear();
cluster.stop();
}
- @Test
- public void shouldReturnAllPartitionsWhenRecordIsBroadcast() throws Exception {
-
- class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
- @Override
- @Deprecated
- public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
- return null;
- }
-
- @Override
- public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
- return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
- }
- }
-
- final int batch1NumMessages = 1;
- final int key = 1;
- final Semaphore semaphore = new Semaphore(0);
-
- final StreamsBuilder builder = new StreamsBuilder();
- getStreamsBuilderWithTopology(builder, semaphore);
-
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
-
- startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60));
-
- final Properties producerProps = new Properties();
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-
- final List<KeyValueTimestamp<Integer, Integer>> records = Collections.singletonList(new KeyValueTimestamp<>(key, 0, mockTime.milliseconds()));
-
- // Send the record to both partitions of INPUT_TOPIC_NAME.
- IntegrationTestUtils.produceSynchronously(producerProps, false, INPUT_TOPIC_NAME, Optional.of(0), records);
- IntegrationTestUtils.produceSynchronously(producerProps, false, INPUT_TOPIC_NAME, Optional.of(1), records);
-
- assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-
- until(() -> {
- final KeyQueryMetadata keyQueryMetadataFetched = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new BroadcastingPartitioner());
- assertThat(keyQueryMetadataFetched.activeHost().host(), is("localhost"));
- assertThat(keyQueryMetadataFetched.partitions(), is(mkSet(0, 1)));
- return true;
- });
- }
-
@Test
public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
final int batch1NumMessages = 100;
@@ -197,7 +148,7 @@ public class StoreQueryIntegrationTest {
final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
- final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+ final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 0;
try {
if (kafkaStreams1IsActive) {
@@ -590,6 +541,40 @@ public class StoreQueryIntegrationTest {
});
}
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions() throws Exception {
+
+ class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
+ @Override
+ @Deprecated
+ public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
+ return null;
+ }
+
+ @Override
+ public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
+ return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
+ }
+ }
+
+ final int batch1NumMessages = 1;
+ final int key = 1;
+ final Semaphore semaphore = new Semaphore(0);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ getStreamsBuilderWithTopology(builder, semaphore);
+
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+
+ startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60));
+ produceValueRange(key, 0, batch1NumMessages);
+
+ assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+ kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new BroadcastingPartitioner());
+ }
+
+
private Matcher<String> retriableException() {
return is(
anyOf(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e04acca447a..2343ece6702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -248,7 +248,7 @@ public class StreamsMetadataStateTest {
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
- final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), Collections.singleton(0));
+ final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), 0);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
"the-key",
Serdes.String().serializer());
@@ -263,30 +263,27 @@ public class StreamsMetadataStateTest {
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
- final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), Collections.singleton(1));
+ final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), 1);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
"the-key",
partitioner);
assertEquals(expected, actual);
assertEquals(1, actual.partition());
- assertEquals(Collections.singleton(1), actual.partitions());
}
@Test
- public void shouldGetInstanceWithKeyAndCustomMulticastingPartitioner() {
- final TopicPartition tp4 = new TopicPartition("topic-three", 0);
- final TopicPartition tp5 = new TopicPartition("topic-three", 1);
- hostToActivePartitions.put(hostTwo, mkSet(tp4, tp5));
+ public void shouldFailWhenIqQueriedWithCustomPartitionerReturningMultiplePartitions() {
+ final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+ hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
+
+ metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
+ cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
- final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, Collections.singleton(hostTwo), mkSet(0, 1));
- final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
+ assertThrows(IllegalArgumentException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three",
"the-key",
- new MultiValuedPartitioner());
- assertEquals(expected, actual);
- assertEquals(-1, actual.partition());
- assertEquals(mkSet(0, 1), actual.partitions());
+ new MultiValuedPartitioner()));
}
@Test
@@ -304,7 +301,7 @@ public class StreamsMetadataStateTest {
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
- final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), Collections.singleton(2));
+ final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("merged-table", "the-key",
(topic, key, value, numPartitions) -> 2);