You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/30 18:25:32 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

guozhangwang commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r418201398



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -838,6 +838,7 @@
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+        tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);

Review comment:
       nit: Could you add the original comment explaining why we set it to smaller value too?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1064,14 +1071,16 @@ private void addAllKeys(final Set<Long> allKeys, final List<KeyValue<Long, Long>
 
     // must be public to allow KafkaProducer to instantiate it
     public static class KeyPartitioner implements Partitioner {
+        private final static LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
+
         @Override
         public int partition(final String topic,
                              final Object key,
                              final byte[] keyBytes,
                              final Object value,
                              final byte[] valueBytes,
                              final Cluster cluster) {
-            return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
+            return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % NUM_TOPIC_PARTITIONS;

Review comment:
       Hmm, this sounds to me that the StreamProducer's own `partitionsFor` did not return the num.partitions so we ended up calling `send` with `partition == null`, since otherwise we will get the `partition` as
   
   ```
   partition = partitioner.partition(topic, key, value, partitions.size());
   ```
   
   where `partitioner` is the `StreamsPartitioner` and the producer's own partitioner should not be used. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org