You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/15 23:16:13 UTC

[kafka] branch 3.4 updated (30b59ea0640 -> 2e1d8139a9b)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a change to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 30b59ea0640 MINOR: update Streams upgrade guide for 3.1 release (#12926)
     new 975ab3bdae5 KAFKA-13602: Remove unwanted logging in RecordCollectorImpl.java (#12985)
     new 2e1d8139a9b Removing Multicasting partitioner for IQ (#12977)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kafka/streams/KeyQueryMetadata.java | 24 +-----
 .../processor/internals/RecordCollectorImpl.java   |  4 +-
 .../processor/internals/StreamsMetadataState.java  | 45 +++++------
 .../integration/StoreQueryIntegrationTest.java     | 87 +++++++++-------------
 .../internals/StreamsMetadataStateTest.java        | 25 +++----
 5 files changed, 68 insertions(+), 117 deletions(-)


[kafka] 01/02: KAFKA-13602: Remove unwanted logging in RecordCollectorImpl.java (#12985)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 975ab3bdae5160732e8bc7e4017ea76de0c120e3
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Tue Dec 13 21:06:00 2022 +0530

    KAFKA-13602: Remove unwanted logging in RecordCollectorImpl.java (#12985)
    
    There is unwanted logging introduced by #12803 as pointed out in this comment: #12803 (comment). This PR removes it.
    
    Reviewers: Lucas Brutschy <lb...@confluent.io>, Bruno Cadonna <ca...@apache.org>
---
 .../apache/kafka/streams/processor/internals/RecordCollectorImpl.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 43c329896f6..51eec220833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -159,7 +159,8 @@ public class RecordCollectorImpl implements RecordCollector {
                     final Set<Integer> multicastPartitions = maybeMulticastPartitions.get();
                     if (multicastPartitions.isEmpty()) {
                         // If a record is not to be sent to any partition, mark it as a dropped record.
-                        log.debug("Not sending the record with key {} , value {} to any partition", key, value);
+                        log.warn("Skipping record as partitioner returned empty partitions. "
+                                + "topic=[{}]", topic);
                         droppedRecordsSensor.record();
                     } else {
                         for (final int multicastPartition: multicastPartitions) {
@@ -227,7 +228,6 @@ public class RecordCollectorImpl implements RecordCollector {
 
             if (exception == null) {
                 final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                log.info("Produced key:{}, value:{} successfully to tp:{}", key, value, tp);
                 if (metadata.offset() >= 0L) {
                     offsets.put(tp, metadata.offset());
                 } else {


[kafka] 02/02: Removing Multicasting partitioner for IQ (#12977)

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2e1d8139a9b450ea0f8787de94577f391ac10dd9
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Fri Dec 16 04:39:41 2022 +0530

    Removing Multicasting partitioner for IQ (#12977)
    
    Follow up PR for KIP-837. We don't want to allow multicasting for IQ. This PR imposes that restriction.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../org/apache/kafka/streams/KeyQueryMetadata.java | 24 +-----
 .../processor/internals/StreamsMetadataState.java  | 45 +++++------
 .../integration/StoreQueryIntegrationTest.java     | 87 +++++++++-------------
 .../internals/StreamsMetadataStateTest.java        | 25 +++----
 4 files changed, 66 insertions(+), 115 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
index 6461ee7423f..9ca495214d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
@@ -43,20 +43,10 @@ public class KeyQueryMetadata {
 
     private final int partition;
 
-    private final Set<Integer> partitions;
-
     public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final int partition) {
         this.activeHost = activeHost;
         this.standbyHosts = standbyHosts;
         this.partition = partition;
-        this.partitions = Collections.singleton(partition);
-    }
-
-    public KeyQueryMetadata(final HostInfo activeHost, final Set<HostInfo> standbyHosts, final Set<Integer> partitions) {
-        this.activeHost = activeHost;
-        this.standbyHosts = standbyHosts;
-        this.partition = partitions.size() == 1 ? partitions.iterator().next() : -1;
-        this.partitions = partitions;
     }
 
     /**
@@ -119,16 +109,6 @@ public class KeyQueryMetadata {
         return partition;
     }
 
-    /**
-     * Get the store partitions corresponding to the key.
-     * A Key can be on multiple partitions if it has been
-     * multicasted using StreamPartitioner#partitions method
-     * @return store partition number
-     */
-    public Set<Integer> partitions() {
-        return partitions;
-    }
-
     @Override
     public boolean equals(final Object obj) {
         if (!(obj instanceof KeyQueryMetadata)) {
@@ -137,8 +117,7 @@ public class KeyQueryMetadata {
         final KeyQueryMetadata keyQueryMetadata = (KeyQueryMetadata) obj;
         return Objects.equals(keyQueryMetadata.activeHost, activeHost)
             && Objects.equals(keyQueryMetadata.standbyHosts, standbyHosts)
-            && (Objects.equals(keyQueryMetadata.partition, partition)
-                || Objects.equals(keyQueryMetadata.partitions, partitions));
+            && Objects.equals(keyQueryMetadata.partition, partition);
     }
 
     @Override
@@ -147,7 +126,6 @@ public class KeyQueryMetadata {
                 "activeHost=" + activeHost +
                 ", standbyHosts=" + standbyHosts +
                 ", partition=" + partition +
-                ", partitions=" + partitions +
                 '}';
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 7217666bcf5..49fa34bc510 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
 
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.kafka.common.Cluster;
@@ -461,25 +463,25 @@ public class StreamsMetadataState {
         return rebuiltMetadata;
     }
 
+    private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> {
+        if (!maybeMulticastPartitions.isPresent()) {
+            return null;
+        }
+        if (maybeMulticastPartitions.get().size() != 1) {
+            throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for key should be a singleton set");
+        }
+        return maybeMulticastPartitions.get().iterator().next();
+    };
+
     private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
                                                            final K key,
                                                            final StreamPartitioner<? super K, ?> partitioner,
                                                            final SourceTopicsInfo sourceTopicsInfo) {
 
-        // Making an assumption that the partitions method won't return an empty Optional set
-        // which means it is not intended to use the default partitioner. It is an optimistic
-        // assumption, but the older implementation with partition() also made the same assumption.
-        final Set<Integer> partitions = partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions).get();
-        // The record was dropped and hence won't be found anywhere
-        if (partitions.isEmpty()) {
-            return new KeyQueryMetadata(UNKNOWN_HOST, Collections.emptySet(), UNKNOWN_PARTITION);
-        }
-
+        final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
         final Set<TopicPartition> matchingPartitions = new HashSet<>();
         for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
-            for (final Integer partition : partitions) {
-                matchingPartitions.add(new TopicPartition(sourceTopic, partition));
-            }
+            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
         }
 
         HostInfo activeHost = UNKNOWN_HOST;
@@ -501,7 +503,7 @@ public class StreamsMetadataState {
             }
         }
 
-        return new KeyQueryMetadata(activeHost, standbyHosts, partitions);
+        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
     }
 
     private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
@@ -510,21 +512,10 @@ public class StreamsMetadataState {
                                                            final SourceTopicsInfo sourceTopicsInfo,
                                                            final String topologyName) {
         Objects.requireNonNull(topologyName, "topology name must not be null");
-
-        // Making an assumption that the partitions method won't return an empty Optional set
-        // which means it is not intended to use the default partitioner. It is an optimistic
-        // assumption, but the older implementation with partition() also made the same assumption.
-        final Set<Integer> partitions = partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions).get();
-        // The record was dropped and hence won't be found anywhere
-        if (partitions.isEmpty()) {
-            return new KeyQueryMetadata(UNKNOWN_HOST, Collections.emptySet(), UNKNOWN_PARTITION);
-        }
-
+        final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
         final Set<TopicPartition> matchingPartitions = new HashSet<>();
         for (final String sourceTopic : sourceTopicsInfo.sourceTopics) {
-            for (final Integer partition : partitions) {
-                matchingPartitions.add(new TopicPartition(sourceTopic, partition));
-            }
+            matchingPartitions.add(new TopicPartition(sourceTopic, partition));
         }
 
         HostInfo activeHost = UNKNOWN_HOST;
@@ -550,7 +541,7 @@ public class StreamsMetadataState {
             }
         }
 
-        return new KeyQueryMetadata(activeHost, standbyHosts, partitions);
+        return new KeyQueryMetadata(activeHost, standbyHosts, partition);
     }
 
     private SourceTopicsInfo getSourceTopicsInfo(final String storeName) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 44e690b3000..260d6992f29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyQueryMetadata;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StoreQueryParameters;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -71,7 +70,6 @@ import java.util.Set;
 import java.util.Collections;
 
 import static java.util.Collections.singletonList;
-import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
@@ -119,57 +117,10 @@ public class StoreQueryIntegrationTest {
         for (final KafkaStreams kafkaStreams : streamsToCleanup) {
             kafkaStreams.close();
         }
+        streamsToCleanup.clear();
         cluster.stop();
     }
 
-    @Test
-    public void shouldReturnAllPartitionsWhenRecordIsBroadcast() throws Exception {
-
-        class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
-            @Override
-            @Deprecated
-            public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
-                return null;
-            }
-
-            @Override
-            public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
-                return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
-            }
-        }
-
-        final int batch1NumMessages = 1;
-        final int key = 1;
-        final Semaphore semaphore = new Semaphore(0);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        getStreamsBuilderWithTopology(builder, semaphore);
-
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
-
-        startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60));
-
-        final Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-
-        final List<KeyValueTimestamp<Integer, Integer>> records = Collections.singletonList(new KeyValueTimestamp<>(key, 0, mockTime.milliseconds()));
-
-        // Send the record to both partitions of INPUT_TOPIC_NAME.
-        IntegrationTestUtils.produceSynchronously(producerProps, false, INPUT_TOPIC_NAME, Optional.of(0), records);
-        IntegrationTestUtils.produceSynchronously(producerProps, false, INPUT_TOPIC_NAME, Optional.of(1), records);
-
-        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
-
-        until(() -> {
-            final KeyQueryMetadata keyQueryMetadataFetched = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new BroadcastingPartitioner());
-            assertThat(keyQueryMetadataFetched.activeHost().host(), is("localhost"));
-            assertThat(keyQueryMetadataFetched.partitions(), is(mkSet(0, 1)));
-            return true;
-        });
-    }
-
     @Test
     public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
         final int batch1NumMessages = 100;
@@ -197,7 +148,7 @@ public class StoreQueryIntegrationTest {
             final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
             final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
 
-            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1;
+            final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 0;
 
             try {
                 if (kafkaStreams1IsActive) {
@@ -590,6 +541,40 @@ public class StoreQueryIntegrationTest {
         });
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldFailWithIllegalArgumentExceptionWhenIQPartitionerReturnsMultiplePartitions() throws Exception {
+
+        class BroadcastingPartitioner implements StreamPartitioner<Integer, String> {
+            @Override
+            @Deprecated
+            public Integer partition(final String topic, final Integer key, final String value, final int numPartitions) {
+                return null;
+            }
+
+            @Override
+            public Optional<Set<Integer>> partitions(final String topic, final Integer key, final String value, final int numPartitions) {
+                return Optional.of(IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet()));
+            }
+        }
+
+        final int batch1NumMessages = 1;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        getStreamsBuilderWithTopology(builder, semaphore);
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+
+        startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams1), Duration.ofSeconds(60));
+        produceValueRange(key, 0, batch1NumMessages);
+
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, new BroadcastingPartitioner());
+    }
+
+
     private Matcher<String> retriableException() {
         return is(
             anyOf(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e04acca447a..2343ece6702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -248,7 +248,7 @@ public class StreamsMetadataStateTest {
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
             cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), Collections.singleton(0));
+        final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), 0);
         final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
                                                                     "the-key",
                                                                     Serdes.String().serializer());
@@ -263,30 +263,27 @@ public class StreamsMetadataStateTest {
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
             cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), Collections.singleton(1));
+        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), 1);
 
         final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
                 "the-key",
                 partitioner);
         assertEquals(expected, actual);
         assertEquals(1, actual.partition());
-        assertEquals(Collections.singleton(1), actual.partitions());
     }
 
     @Test
-    public void shouldGetInstanceWithKeyAndCustomMulticastingPartitioner() {
-        final TopicPartition tp4 = new TopicPartition("topic-three", 0);
-        final TopicPartition tp5 = new TopicPartition("topic-three", 1);
-        hostToActivePartitions.put(hostTwo, mkSet(tp4, tp5));
+    public void shouldFailWhenIqQueriedWithCustomPartitionerReturningMultiplePartitions() {
+        final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+        hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
+
+        metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
+                cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, Collections.singleton(hostTwo), mkSet(0, 1));
 
-        final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
+        assertThrows(IllegalArgumentException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three",
                 "the-key",
-                new MultiValuedPartitioner());
-        assertEquals(expected, actual);
-        assertEquals(-1, actual.partition());
-        assertEquals(mkSet(0, 1), actual.partitions());
+                new MultiValuedPartitioner()));
     }
 
     @Test
@@ -304,7 +301,7 @@ public class StreamsMetadataStateTest {
         metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
                 cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
 
-        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), Collections.singleton(2));
+        final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
 
         final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("merged-table",  "the-key",
             (topic, key, value, numPartitions) -> 2);