You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/02 15:51:44 UTC

[pulsar] branch master updated: Issue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new deb6492  Issue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)
deb6492 is described below

commit deb6492b579a062c3b6d8b0e611461d8c26f87d9
Author: Guillaume Rosauro <34...@users.noreply.github.com>
AuthorDate: Tue Jul 2 17:51:39 2019 +0200

    Issue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)
    
    * Issue #4638: Update Kafka connect-api to version 2.3.0
    
    * remove 'block.on.buffer.full' property (already removed from kafka)
---
 pom.xml                                            |  2 +-
 .../clients/consumer/PulsarKafkaConsumer.java      | 66 ++++++++++++++++++++++
 .../clients/producer/PulsarKafkaProducer.java      | 58 +++++++++++--------
 .../compat/KafkaProducerInterceptorWrapper.java    |  6 +-
 .../io/kafka/connect/KafkaConnectSource.java       |  5 +-
 .../kafka/connect/PulsarIOSourceTaskContext.java   | 11 +++-
 .../io/kafka/connect/KafkaConnectSourceTest.java   |  1 +
 site2/docs/adaptors-kafka.md                       |  1 -
 8 files changed, 119 insertions(+), 31 deletions(-)

diff --git a/pom.xml b/pom.xml
index 427035c..9874e06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@ flexible messaging model and an intuitive client API.</description>
     <hbc-core.version>2.2.0</hbc-core.version>
     <cassandra-driver-core.version>3.6.0</cassandra-driver-core.version>
     <aerospike-client.version>4.1.11</aerospike-client.version>
-    <kafka-client.version>0.10.2.1</kafka-client.version>
+    <kafka-client.version>2.3.0</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
     <aws-sdk.version>1.11.297</aws-sdk.version>
     <avro.version>1.8.2</avro.version>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0c0059f..bbab13e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collection;
@@ -308,6 +309,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public void subscribe(Pattern pattern) {
+        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
+    }
+
+    @Override
     public void unsubscribe() {
         consumers.values().forEach(c -> {
             try {
@@ -386,6 +392,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         }
     }
 
+    @Override
+    public ConsumerRecords<K, V> poll(Duration duration) {
+        return poll(duration.toMillis());
+    }
+
     @SuppressWarnings("unchecked")
     private K getKey(String topic, Message<byte[]> msg) {
         if (!msg.hasKey()) {
@@ -411,6 +422,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public void commitSync(Duration duration) {
+        commitSync();
+    }
+
+    @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
         try {
             doCommitOffsets(offsets).get();
@@ -420,6 +436,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
+        commitSync(map);
+    }
+
+    @Override
     public void commitAsync() {
         doCommitOffsets(getCurrentOffsetsMap());
     }
@@ -517,6 +538,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
+        seek(topicPartition, offsetAndMetadata.offset());
+    }
+
+    @Override
     public void seekToBeginning(Collection<TopicPartition> partitions) {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
@@ -571,6 +597,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         return unpolledPartitions.contains(partition) ? 0 : offset;
     }
 
+    @Override
+    public long position(TopicPartition topicPartition, Duration duration) {
+        return position(topicPartition);
+    }
+
     private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
     	log.info("Resetting partition {} and seeking to {} position", partition, strategy);
         if (strategy == SubscriptionInitialPosition.Earliest) {
@@ -587,6 +618,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
+        return committed(topicPartition);
+    }
+
+    @Override
     public Map<MetricName, ? extends Metric> metrics() {
         throw new UnsupportedOperationException();
     }
@@ -597,11 +633,21 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public List<PartitionInfo> partitionsFor(String s, Duration duration) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Map<String, List<PartitionInfo>> listTopics() {
         throw new UnsupportedOperationException();
     }
 
     @Override
+    public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Set<TopicPartition> paused() {
         throw new UnsupportedOperationException();
     }
@@ -622,16 +668,31 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
+    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public void close() {
         close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
     }
@@ -652,6 +713,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     }
 
     @Override
+    public void close(Duration duration) {
+        close(duration.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
     public void wakeup() {
         throw new UnsupportedOperationException();
     }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 8ae3bf3..00c49bb 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,6 +19,7 @@
 package org.apache.kafka.clients.producer;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
@@ -34,11 +35,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.CompressionType;
@@ -152,13 +155,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
         pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
 
-        boolean blockOnBufferFull = Boolean
-                .parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"));
-
         // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
         // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
         // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
-        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
+        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
         pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
 
         interceptors = (List) producerConfig.getConfiguredInstances(
@@ -166,6 +166,31 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
     }
 
     @Override
+    public void initTransactions() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void beginTransaction() throws ProducerFencedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) throws ProducerFencedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void commitTransaction() throws ProducerFencedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void abortTransaction() throws ProducerFencedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
         return send(record, null);
     }
@@ -240,6 +265,11 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    @Override
+    public void close(Duration duration) {
+        close(duration.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
     private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
         try {
             // Add the partitions info for the new topic
@@ -317,25 +347,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
         TopicPartition tp = new TopicPartition(topic, partition);
         TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
-        return new RecordMetadata(tp, offset, 0, mb.getPublishTime(), 0, mb.hasKey() ? mb.getKey().length() : 0, size);
-    }
-
-    private ProducerInterceptor createKafkaProducerInterceptor(String clazz) {
-        try {
-            return (ProducerInterceptor) Class.forName(clazz).newInstance();
-        } catch (ClassNotFoundException e) {
-            String errorMessage = "Can't find Interceptor class: " + e.getMessage();
-            logger.error(errorMessage);
-            throw new RuntimeException(errorMessage);
-        } catch (InstantiationException e) {
-            String errorMessage = "Can't initiate provided Interceptor class: " + e.getMessage();
-            logger.error(errorMessage);
-            throw new RuntimeException(errorMessage);
-        } catch (IllegalAccessException e) {
-            String errorMessage = "Can't access provided Interceptor class: " + e.getMessage();
-            logger.error(errorMessage);
-            throw new RuntimeException(errorMessage);
-        }
+        return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
     }
 
     private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
index 5cedef9..24697cd 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -141,10 +141,10 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
             partitionID = getPartitionID(messageMetadataBuilder);
             TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID));
             kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition,
-                                                                -1,
-                                                                -1,
+                                                                -1L,
+                                                                -1L,
                                                                 messageMetadataBuilder.getEventTime(),
-                                                                -1,
+                                                                -1L,
                                                                 message.getKeyBytes().length,
                                                                 message.getValue().length), new Exception(exception));
         } catch (NumberFormatException e) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 3ccbb4b..d9662c4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -105,7 +105,8 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
         valueConverter.configure(config, false);
 
         offsetStore = new PulsarOffsetBackingStore();
-        offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig));
+        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
+        offsetStore.configure(pulsarKafkaWorkerConfig);
         offsetStore.start();
 
         offsetReader = new OffsetStorageReaderImpl(
@@ -121,7 +122,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
             valueConverter
         );
 
-        sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader);
+        sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);
 
         sourceTask.initialize(sourceTaskContext);
         sourceTask.start(stringConfig);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
index 561a090..1445534 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
@@ -21,12 +21,21 @@ package org.apache.pulsar.io.kafka.connect;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 
+import java.util.Map;
+
 class PulsarIOSourceTaskContext implements SourceTaskContext {
 
     private final OffsetStorageReader reader;
+    private final PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig;
 
-    PulsarIOSourceTaskContext(OffsetStorageReader reader) {
+    PulsarIOSourceTaskContext(OffsetStorageReader reader, PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig) {
         this.reader = reader;
+        this.pulsarKafkaWorkerConfig = pulsarKafkaWorkerConfig;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return pulsarKafkaWorkerConfig.originalsStrings();
     }
 
     @Override
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 7ff8845..4e4bf81 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -67,6 +67,7 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase  {
         config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
         tempFile = File.createTempFile("some-file-name", null);
         config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
 
     }
 
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index ca4f877..6c35f6e 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -130,7 +130,6 @@ Properties:
 | `acks`                                  | Ignored   | Durability and quorum writes are configured at the namespace level            |
 | `auto.offset.reset`                     | Yes       | Will have a default value of `latest` if user does not give specific setting. |
 | `batch.size`                            | Ignored   |                                                                               |
-| `block.on.buffer.full`                  | Yes       | If true it will block producer, otherwise an error is returned.                          |
 | `bootstrap.servers`                     | Yes       |                                 |
 | `buffer.memory`                         | Ignored   |                                                                               |
 | `client.id`                             | Ignored   |                                                                               |