You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 07:04:00 UTC

[jira] [Commented] (KAFKA-3633) Kafka Consumer API breaking backward compatibility

    [ https://issues.apache.org/jira/browse/KAFKA-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375966#comment-16375966 ] 

ASF GitHub Bot commented on KAFKA-3633:
---------------------------------------

hachikuji closed pull request #1281: KAFKA-3633: Kafka Consumer API breaking backward compatibility
URL: https://github.com/apache/kafka/pull/1281
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ad44d162500..3b79002aa17 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1,14 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under
+ * the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.consumer;
 
@@ -45,6 +45,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -512,8 +513,8 @@ public KafkaConsumer(Map<String, Object> configs,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
         this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
-            keyDeserializer,
-            valueDeserializer);
+                keyDeserializer,
+                valueDeserializer);
     }
 
     /**
@@ -542,8 +543,8 @@ public KafkaConsumer(Properties properties,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
         this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
-             keyDeserializer,
-             valueDeserializer);
+                keyDeserializer,
+                valueDeserializer);
     }
 
     @SuppressWarnings("unchecked")
@@ -874,7 +875,7 @@ public void assign(Collection<TopicPartition> partitions) {
                     fetcher.sendFetches(metadata.fetch());
                     client.quickPoll(false);
                     return this.interceptors == null
-                        ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
+                            ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }
 
                 long elapsed = time.milliseconds() - start;
@@ -1059,6 +1060,19 @@ public void seek(TopicPartition partition, long offset) {
         }
     }
 
+    /**
+     * @see KafkaConsumer#seekToBeginning(java.util.Collection)
+     */
+    @Deprecated
+    public void seekToBeginning(TopicPartition... partitions) {
+        acquire();
+        try {
+            seekToBeginning(Arrays.asList(partitions));
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
      * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
@@ -1077,6 +1091,19 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
         }
     }
 
+    /**
+     * @see KafkaConsumer#seekToEnd(java.util.Collection)
+     */
+    @Deprecated
+    public void seekToEnd(TopicPartition... partitions) {
+        acquire();
+        try {
+            seekToEnd(Arrays.asList(partitions));
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
      * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
@@ -1219,6 +1246,19 @@ public OffsetAndMetadata committed(TopicPartition partition) {
         }
     }
 
+    /**
+     * @see KafkaConsumer#pause(java.util.Collection)
+     */
+    @Deprecated
+    public void pause(TopicPartition... partitions) {
+        acquire();
+        try {
+            pause(Arrays.asList(partitions));
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
      * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
@@ -1230,7 +1270,7 @@ public OffsetAndMetadata committed(TopicPartition partition) {
     public void pause(Collection<TopicPartition> partitions) {
         acquire();
         try {
-            for (TopicPartition partition: partitions) {
+            for (TopicPartition partition : partitions) {
                 log.debug("Pausing partition {}", partition);
                 subscriptions.pause(partition);
             }
@@ -1239,6 +1279,19 @@ public void pause(Collection<TopicPartition> partitions) {
         }
     }
 
+    /**
+     * @see KafkaConsumer#resume(java.util.Collection)
+     */
+    @Deprecated
+    public void resume(TopicPartition... partitions) {
+        acquire();
+        try {
+            pause(Arrays.asList(partitions));
+        } finally {
+            release();
+        }
+    }
+
     /**
      * Resume specified partitions which have been paused with {@link #pause(Collection)}. New calls to
      * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
@@ -1249,7 +1302,7 @@ public void pause(Collection<TopicPartition> partitions) {
     public void resume(Collection<TopicPartition> partitions) {
         acquire();
         try {
-            for (TopicPartition partition: partitions) {
+            for (TopicPartition partition : partitions) {
                 log.debug("Resuming partition {}", partition);
                 subscriptions.resume(partition);
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka Consumer API breaking backward compatibility
> --------------------------------------------------
>
>                 Key: KAFKA-3633
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3633
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Sriharsha Chintalapani
>            Assignee: Sriharsha Chintalapani
>            Priority: Blocker
>             Fix For: 0.10.0.0
>
>
> KAFKA-2991 and KAFKA-3006 broke the backward compatibility. In storm we already using 0.9.0.1 consumer api for the KafkaSpout. We should atleast kept the older methods and shouldn't be breaking backward compatibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)