You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/02 15:51:44 UTC
[pulsar] branch master updated: Issue #4638 : Update Kafka
connect-api to version 2.3.0 (#4650)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new deb6492 Issue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)
deb6492 is described below
commit deb6492b579a062c3b6d8b0e611461d8c26f87d9
Author: Guillaume Rosauro <34...@users.noreply.github.com>
AuthorDate: Tue Jul 2 17:51:39 2019 +0200
Issue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)
* Issue #4638: Update Kafka connect-api to version 2.3.0
* remove 'block.on.buffer.full' property (already removed from kafka)
---
pom.xml | 2 +-
.../clients/consumer/PulsarKafkaConsumer.java | 66 ++++++++++++++++++++++
.../clients/producer/PulsarKafkaProducer.java | 58 +++++++++++--------
.../compat/KafkaProducerInterceptorWrapper.java | 6 +-
.../io/kafka/connect/KafkaConnectSource.java | 5 +-
.../kafka/connect/PulsarIOSourceTaskContext.java | 11 +++-
.../io/kafka/connect/KafkaConnectSourceTest.java | 1 +
site2/docs/adaptors-kafka.md | 1 -
8 files changed, 119 insertions(+), 31 deletions(-)
diff --git a/pom.xml b/pom.xml
index 427035c..9874e06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@ flexible messaging model and an intuitive client API.</description>
<hbc-core.version>2.2.0</hbc-core.version>
<cassandra-driver-core.version>3.6.0</cassandra-driver-core.version>
<aerospike-client.version>4.1.11</aerospike-client.version>
- <kafka-client.version>0.10.2.1</kafka-client.version>
+ <kafka-client.version>2.3.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0c0059f..bbab13e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -18,6 +18,7 @@
*/
package org.apache.kafka.clients.consumer;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
@@ -308,6 +309,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public void subscribe(Pattern pattern) {
+ throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
+ }
+
+ @Override
public void unsubscribe() {
consumers.values().forEach(c -> {
try {
@@ -386,6 +392,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
}
+ @Override
+ public ConsumerRecords<K, V> poll(Duration duration) {
+ return poll(duration.toMillis());
+ }
+
@SuppressWarnings("unchecked")
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
@@ -411,6 +422,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public void commitSync(Duration duration) {
+ commitSync();
+ }
+
+ @Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
doCommitOffsets(offsets).get();
@@ -420,6 +436,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
+ commitSync(map);
+ }
+
+ @Override
public void commitAsync() {
doCommitOffsets(getCurrentOffsetsMap());
}
@@ -517,6 +538,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
+ seek(topicPartition, offsetAndMetadata.offset());
+ }
+
+ @Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -571,6 +597,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
return unpolledPartitions.contains(partition) ? 0 : offset;
}
+ @Override
+ public long position(TopicPartition topicPartition, Duration duration) {
+ return position(topicPartition);
+ }
+
private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
log.info("Resetting partition {} and seeking to {} position", partition, strategy);
if (strategy == SubscriptionInitialPosition.Earliest) {
@@ -587,6 +618,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
+ return committed(topicPartition);
+ }
+
+ @Override
public Map<MetricName, ? extends Metric> metrics() {
throw new UnsupportedOperationException();
}
@@ -597,11 +633,21 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public List<PartitionInfo> partitionsFor(String s, Duration duration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Map<String, List<PartitionInfo>> listTopics() {
throw new UnsupportedOperationException();
}
@Override
+ public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Set<TopicPartition> paused() {
throw new UnsupportedOperationException();
}
@@ -622,16 +668,31 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
}
@Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
}
@Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
@@ -652,6 +713,11 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
@Override
+ public void close(Duration duration) {
+ close(duration.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
public void wakeup() {
throw new UnsupportedOperationException();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 8ae3bf3..00c49bb 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,6 +19,7 @@
package org.apache.kafka.clients.producer;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
@@ -34,11 +35,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.CompressionType;
@@ -152,13 +155,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
- boolean blockOnBufferFull = Boolean
- .parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false"));
-
// Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
// Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
- boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
+ boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
interceptors = (List) producerConfig.getConfiguredInstances(
@@ -166,6 +166,31 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
@Override
+ public void initTransactions() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void beginTransaction() throws ProducerFencedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String s) throws ProducerFencedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void commitTransaction() throws ProducerFencedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void abortTransaction() throws ProducerFencedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@@ -240,6 +265,11 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
}
}
+ @Override
+ public void close(Duration duration) {
+ close(duration.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
try {
// Add the partitions info for the new topic
@@ -317,25 +347,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
TopicPartition tp = new TopicPartition(topic, partition);
TypedMessageBuilderImpl<byte[]> mb = (TypedMessageBuilderImpl<byte[]>) msgBuilder;
- return new RecordMetadata(tp, offset, 0, mb.getPublishTime(), 0, mb.hasKey() ? mb.getKey().length() : 0, size);
- }
-
- private ProducerInterceptor createKafkaProducerInterceptor(String clazz) {
- try {
- return (ProducerInterceptor) Class.forName(clazz).newInstance();
- } catch (ClassNotFoundException e) {
- String errorMessage = "Can't find Interceptor class: " + e.getMessage();
- logger.error(errorMessage);
- throw new RuntimeException(errorMessage);
- } catch (InstantiationException e) {
- String errorMessage = "Can't initiate provided Interceptor class: " + e.getMessage();
- logger.error(errorMessage);
- throw new RuntimeException(errorMessage);
- } catch (IllegalAccessException e) {
- String errorMessage = "Can't access provided Interceptor class: " + e.getMessage();
- logger.error(errorMessage);
- throw new RuntimeException(errorMessage);
- }
+ return new RecordMetadata(tp, offset, 0L, mb.getPublishTime(), 0L, mb.hasKey() ? mb.getKey().length() : 0, size);
}
private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
index 5cedef9..24697cd 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -141,10 +141,10 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
partitionID = getPartitionID(messageMetadataBuilder);
TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID));
kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition,
- -1,
- -1,
+ -1L,
+ -1L,
messageMetadataBuilder.getEventTime(),
- -1,
+ -1L,
message.getKeyBytes().length,
message.getValue().length), new Exception(exception));
} catch (NumberFormatException e) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 3ccbb4b..d9662c4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -105,7 +105,8 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
valueConverter.configure(config, false);
offsetStore = new PulsarOffsetBackingStore();
- offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig));
+ PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
+ offsetStore.configure(pulsarKafkaWorkerConfig);
offsetStore.start();
offsetReader = new OffsetStorageReaderImpl(
@@ -121,7 +122,7 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
valueConverter
);
- sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader);
+ sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);
sourceTask.initialize(sourceTaskContext);
sourceTask.start(stringConfig);
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
index 561a090..1445534 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarIOSourceTaskContext.java
@@ -21,12 +21,21 @@ package org.apache.pulsar.io.kafka.connect;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
+import java.util.Map;
+
class PulsarIOSourceTaskContext implements SourceTaskContext {
private final OffsetStorageReader reader;
+ private final PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig;
- PulsarIOSourceTaskContext(OffsetStorageReader reader) {
+ PulsarIOSourceTaskContext(OffsetStorageReader reader, PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig) {
this.reader = reader;
+ this.pulsarKafkaWorkerConfig = pulsarKafkaWorkerConfig;
+ }
+
+ @Override
+ public Map<String, String> configs() {
+ return pulsarKafkaWorkerConfig.originalsStrings();
}
@Override
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 7ff8845..4e4bf81 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -67,6 +67,7 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase {
config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
tempFile = File.createTempFile("some-file-name", null);
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
+ config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index ca4f877..6c35f6e 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -130,7 +130,6 @@ Properties:
| `acks` | Ignored | Durability and quorum writes are configured at the namespace level |
| `auto.offset.reset` | Yes | Will have a default value of `latest` if user does not give specific setting. |
| `batch.size` | Ignored | |
-| `block.on.buffer.full` | Yes | If true it will block producer, otherwise an error is returned. |
| `bootstrap.servers` | Yes | |
| `buffer.memory` | Ignored | |
| `client.id` | Ignored | |