You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/15 23:16:15 UTC

[kafka] 02/02: Removing Multicasting partitioner for IQ (#12977)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2e1d8139a9b450ea0f8787de94577f391ac10dd9
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Fri Dec 16 04:39:41 2022 +0530

    Removing Multicasting partitioner for IQ (#12977)
    
    Follow up PR for KIP-837. We don't want to allow multicasting for IQ. This PR imposes that restriction.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../org/apache/kafka/streams/KeyQueryMetadata.java | 24 +-----
 .../processor/internals/StreamsMetadataState.java  | 45 +++++------
 .../integration/StoreQueryIntegrationTest.java     | 87 +++++++++-------------
 .../internals/StreamsMetadataStateTest.java        | 25 +++----
 4 files changed, 66 insertions(+), 115 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
index 6461ee7423f..9ca495214d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
@@ -43,20 +43,10 @@ public class KeyQueryMetadata {
 
     private final int partition;
 
-    private final Set<Integer> partitions;
-
     public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final int partition) {
         this.activeHost = activeHost;
         this.standbyHosts = standbyHosts;
         this.partition = partition;
-        this.partitions = Collections.singleton(partition);
-    }
-
-    public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final Set<Integer> partitions) {
-        this.activeHost = activeHost;
-        this.standbyHosts = standbyHosts;
-        this.partition = partitions.size() == 1 ? partitions.iterator().next() : -1;
-        this.partitions = partitions;
     }
 
     /**
@@ -119,16 +109,6 @@ public class KeyQueryMetadata {
         return partition;
     }
 
-    /**
-     * Get the store partitions corresponding to the key.
-     * A Key can be on multiple partitions if it has been
-     * multicasted using StreamPartitioner#partitions method
-     * @return store partition number
-     */
-    public Set<Integer> partitions() {
-        return partitions;
-    }
-
     @Override
     public boolean equals(final Object obj) {
         if (!(obj instanceof KeyQueryMetadata)) {
@@ -137,8 +117,7 @@ public class KeyQueryMetadata {
         final KeyQueryMetadata keyQueryMetadata = (KeyQueryMetadata) obj;
         return Objects.equals(keyQueryMetadata.activeHost, activeHost)
             && Objects.equals(keyQueryMetadata.standbyHosts, standbyHosts)
-            && (Objects.equals(keyQueryMetadata.partition, partition)
-                || Objects.equals(keyQueryMetadata.partitions, partitions));
+            && Objects.equals(keyQueryMetadata.partition, partition);
     }
 
     @Override
@@ -147,7 +126,6 @@ public class KeyQueryMetadata {
                 "activeHost=" + activeHost +
                 ", standbyHosts=" + standbyHosts +
                 ", partition=" + partition +
-                ", partitions=" + partitions +
                 '}';
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 7217666bcf5..49fa34bc510 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
 
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.kafka.common.Cluster;
@@ -461,25 +463,25 @@ public class StreamsMetadataState {
         return rebuiltMetadata;
     }
 
+    private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> {
+        if (!maybeMulticastPartitions.isPresent()) {
+            return null;
+        }
+        if (maybeMulticastPartitions.get().size() != 1) {
+            throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for key should be a singleton set");
+        }
+        return maybeMulticastPartitions.get().iterator().next();
+    };
+
     private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                            final K key,
                                                            final StreamPartitioner<? super K, ?> partitioner,
                                                            final SourceTopicsInfo sourceTopicsInfo) {
 
-        // Making an assumption that the partitions method won't return an empty Optional set
-        // which means it is not intended to use the default partitioner. It is an optimistic
-        // assumption, but the older implementation with partition() also made the same assumption.
-        final Set<Integer> partitions = partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions).get();
-        // The record was dropped and hence won't be found anywhere
-        if (partitions.isEmpty()) {
-            return new KeyQueryMetadata(UNKNOWN_HOST, Collections.emptySet(), UNKNOWN_PARTITION);
-        }
-
+        final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
         final Set<TopicPartition> matchingPartitions = new HashSet<>();
         for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
-            for (final Integer partition : partitions) {
-                matchingPartitions.add(new TopicPartition(sourceTopic, partition));
-            }
+            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
         }
 
         HostInfo activeHost = UNKNOWN_HOST;
@@ -501,7 +503,7 @@ public class StreamsMetadataState {
             }
         }
 
-        return new KeyQueryMetadata(activeHost, standbyHosts, partitions);
+        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
     }
 
     private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
@@ -510,21 +512,10 @@ public class StreamsMetadataState {
                                                            final SourceTopicsInfo sourceTopicsInfo,
                                                            final String topologyName) {
         Objects.requireNonNull(topologyName, "topology name must not be null");
-
-        // Making an assumption that the partitions method won't return an empty Optional set
-        // which means it is not intended to use the default partitioner. It is an optimistic
-        // assumption, but the older implementation with partition() also made the same assumption.
-        final Set<Integer> partitions = partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions).get();
-        // The record was dropped and hence won't be found anywhere
-        if (partitions.isEmpty()) {
-            return new KeyQueryMetadata(UNKNOWN_HOST, Collections.emptySet(), UNKNOWN_PARTITION);
-        }
-
+        final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
         final Set<TopicPartition> matchingPartitions = new HashSet<>();
         for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
-            for (final Integer partition : partitions) {
-                matchingPartitions.add(new TopicPartition(sourceTopic, partition));
-            }
+            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
         }
 
         HostInfo activeHost = UNKNOWN_HOST;
@@ -550,7 +541,7 @@ public class StreamsMetadataState {
             }
         }
 
-        return new KeyQueryMetadata(activeHost, standbyHosts, partitions);
+        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
     }
 
     private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 44e690b3000..260d6992f29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyQueryMetadata;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -71,7 +70,6 @@ import java.util.Set;
 import java.util.Collections;
 
 import static java.util.Collections.singletonList;
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
@@ -119,57 +117,10 @@ public class StoreQueryIntegrationTest {
         for (final KafkaStreams kafkaStreams : streamsToCleanup) {
             kafkaStreams.close();
         }
+        streamsToCleanup.clear();
         cluster.stop();
     }
 
-    @Test
-    public void shouldReturnAllPartitionsWhenRecordIsBroadcast() throws Exception {
-
-        class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
-            @Override
-            @Deprecated
-            public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
-                return null;
-            }
-
-            @Override
-            public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
-                return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
-            }
-        }
-
-        final int batch1NumMessages = 1;
-        final int key = 1;
-        final Semaphore semaphore = new Semaphore(0);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        getStreamsBuilderWithTopology(builder, semaphore);
-
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
-
-        startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60));
-
-        final Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-
-        final List<KeyValueTimestamp<Integer, Integer>> records = Collections.singletonList(new KeyValueTimestamp<>(key, 0, mockTime.milliseconds()));
-
-        // Send the record to both partitions of INPUT_TOPIC_NAME.
-        IntegrationTestUtils.produceSynchronously(producerProps, false, INPUT_TOPIC_NAME, Optional.of(0), records);
-        IntegrationTestUtils.produceSynchronously(producerProps, false, INPUT_TOPIC_NAME, Optional.of(1), records);
-
-        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-
-        until(() -> {
-            final KeyQueryMetadata keyQueryMetadataFetched = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new BroadcastingPartitioner());
-            assertThat(keyQueryMetadataFetched.activeHost().host(), is("localhost"));
-            assertThat(keyQueryMetadataFetched.partitions(), is(mkSet(0, 1)));
-            return true;
-        });
-    }
-
     @Test
     public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
         final int batch1NumMessages = 100;
@@ -197,7 +148,7 @@ public class StoreQueryIntegrationTest {
             final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
             final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
 
-            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 0;
 
             try {
                 if (kafkaStreams1IsActive) {
@@ -590,6 +541,40 @@ public class StoreQueryIntegrationTest {
         });
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions() throws Exception {
+
+        class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
+            @Override
+            @Deprecated
+            public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
+                return null;
+            }
+
+            @Override
+            public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
+                return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
+            }
+        }
+
+        final int batch1NumMessages = 1;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        getStreamsBuilderWithTopology(builder, semaphore);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+
+        startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60));
+        produceValueRange(key, 0, batch1NumMessages);
+
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new BroadcastingPartitioner());
+    }
+
+
     private Matcher<String> retriableException() {
         return is(
             anyOf(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e04acca447a..2343ece6702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -248,7 +248,7 @@ public class StreamsMetadataStateTest {
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
             cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), Collections.singleton(0));
+        final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), 0);
         final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
                                                                     "the-key",
                                                                     Serdes.String().serializer());
@@ -263,30 +263,27 @@ public class StreamsMetadataStateTest {
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
             cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), Collections.singleton(1));
+        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), 1);
 
         final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
                 "the-key",
                 partitioner);
         assertEquals(expected, actual);
         assertEquals(1, actual.partition());
-        assertEquals(Collections.singleton(1), actual.partitions());
     }
 
     @Test
-    public void shouldGetInstanceWithKeyAndCustomMulticastingPartitioner() {
-        final TopicPartition tp4 = new TopicPartition("topic-three", 0);
-        final TopicPartition tp5 = new TopicPartition("topic-three", 1);
-        hostToActivePartitions.put(hostTwo, mkSet(tp4, tp5));
+    public void shouldFailWhenIqQueriedWithCustomPartitionerReturningMultiplePartitions() {
+        final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+        hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
+
+        metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
+                cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, Collections.singleton(hostTwo), mkSet(0, 1));
 
-        final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
+        assertThrows(IllegalArgumentException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three",
                 "the-key",
-                new MultiValuedPartitioner());
-        assertEquals(expected, actual);
-        assertEquals(-1, actual.partition());
-        assertEquals(mkSet(0, 1), actual.partitions());
+                new MultiValuedPartitioner()));
     }
 
     @Test
@@ -304,7 +301,7 @@ public class StreamsMetadataStateTest {
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
                 cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), Collections.singleton(2));
+        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
 
         final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("merged-table",  "the-key",
             (topic, key, value, numPartitions) -> 2);