You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/01 16:28:14 UTC

[GitHub] [pulsar] sijie commented on a change in pull request #9927: Add ability to use Kafka's sinks as pulsar sinks

sijie commented on a change in pull request #9927:
URL: https://github.com/apache/pulsar/pull/9927#discussion_r643256387



##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<GenericObject> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeTypeToKafkaSchema;
+
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+        pulsarSchemaTypeTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
+                .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
+                .put(SchemaType.INT8, Schema.INT8_SCHEMA)
+                .put(SchemaType.INT16, Schema.INT16_SCHEMA)
+                .put(SchemaType.INT32, Schema.INT32_SCHEMA)
+                .put(SchemaType.INT64, Schema.INT64_SCHEMA)
+                .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
+                .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
+                .put(SchemaType.STRING, Schema.STRING_SCHEMA)
+                .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    @VisibleForTesting
+    PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private long maxBatchSize;
+    private final AtomicLong currentBatchSize = new AtomicLong(0L);
+
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
+    private volatile boolean isRunning = false;
+
+    private final Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<GenericObject> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.warn("Sink is stopped. Cannot send the record {}", sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        // while sourceRecord.getMessage() is Optional<>
+        // it should always be present in Sink which gets instance of PulsarRecord
+        // let's avoid checks for .isPresent() in teh rest of the code
+        Preconditions.checkArgument(sourceRecord.getMessage().isPresent());
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlushQueue.add(sourceRecord);
+        currentBatchSize.addAndGet(sourceRecord.getMessage().get().size());
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set");
+        Preconditions.checkArgument(ctx.getSubscriptionType() == SubscriptionType.Failover
+                || ctx.getSubscriptionType() == SubscriptionType.Exclusive,
+                "Source must run with Exclusive or Failover subscription type");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        Preconditions.checkNotNull(configs);
+        Preconditions.checkArgument(configs.size() == 1);
+
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        maxBatchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS);
+
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || currentBatchSize.get() >= maxBatchSize) {

Review comment:
       @dlg99 This will cause submitting multiple flush tasks, no? Because `currentBatchSize` will always be larger than `maxBatchSize` before the `flush` action is taken.

##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class PulsarKafkaConnectSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+            defaultValue = "16384",
+            help = "Size of messages in bytes the sink will attempt to batch messages together before flush.")
+    private int batchSize = 16384;
+
+    @FieldDoc(
+            defaultValue = "2147483647L",
+            help = "Time interval in milliseconds the sink will attempt to batch messages together before flush.")
+    private long lingerTimeMs = 2147483647L;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "The Kafka topic name that passed to kafka sink.")
+    private String topic;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "A kafka-connector sink class to use.")
+    private String kafkaConnectorSinkClass;
+
+    @FieldDoc(
+            defaultValue = "",
+            help = "Config properties to pass to the kafka connector.")
+    private Map<String, String> kafkaConnectorConfigProperties;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "Pulsar topic to store offsets at.")
+    private String offsetStorageTopic;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "Pulsar service URL to use for the offset store.")
+    private String pulsarServiceUrl;

Review comment:
       This field will potentially be removed after PulsarClient is exposed to the context. So you might want to document it.

##########
File path: pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
+
+@Slf4j
+public class KafkaConnectSink implements Sink<GenericObject> {
+
+    private boolean unwrapKeyValueIfAvailable;
+
+    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
+    private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeTypeToKafkaSchema;
+
+    static {
+        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
+                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
+                .put(Byte.class, Schema.INT8_SCHEMA)
+                .put(Short.class, Schema.INT16_SCHEMA)
+                .put(Integer.class, Schema.INT32_SCHEMA)
+                .put(Long.class, Schema.INT64_SCHEMA)
+                .put(Float.class, Schema.FLOAT32_SCHEMA)
+                .put(Double.class, Schema.FLOAT64_SCHEMA)
+                .put(String.class, Schema.STRING_SCHEMA)
+                .put(byte[].class, Schema.BYTES_SCHEMA)
+                .build();
+        pulsarSchemaTypeTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
+                .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
+                .put(SchemaType.INT8, Schema.INT8_SCHEMA)
+                .put(SchemaType.INT16, Schema.INT16_SCHEMA)
+                .put(SchemaType.INT32, Schema.INT32_SCHEMA)
+                .put(SchemaType.INT64, Schema.INT64_SCHEMA)
+                .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
+                .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
+                .put(SchemaType.STRING, Schema.STRING_SCHEMA)
+                .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
+                .build();
+    }
+
+    private PulsarKafkaSinkContext sinkContext;
+    @VisibleForTesting
+    PulsarKafkaSinkTaskContext taskContext;
+    private SinkConnector connector;
+    private SinkTask task;
+
+    private long maxBatchSize;
+    private final AtomicLong currentBatchSize = new AtomicLong(0L);
+
+    private long lingerMs;
+    private final ScheduledExecutorService scheduledExecutor =
+            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+                    .setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d")
+                    .build());
+    private final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque<>();
+    private volatile boolean isRunning = false;
+
+    private final Properties props = new Properties();
+    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
+
+    protected String topicName;
+
+    @Override
+    public void write(Record<GenericObject> sourceRecord) {
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", sourceRecord);
+        }
+
+        if (!isRunning) {
+            log.warn("Sink is stopped. Cannot send the record {}", sourceRecord);
+            sourceRecord.fail();
+            return;
+        }
+
+        // while sourceRecord.getMessage() is Optional<>
+        // it should always be present in Sink which gets instance of PulsarRecord
+        // let's avoid checks for .isPresent() in teh rest of the code
+        Preconditions.checkArgument(sourceRecord.getMessage().isPresent());
+        try {
+            SinkRecord record = toSinkRecord(sourceRecord);
+            task.put(Lists.newArrayList(record));
+        } catch (Exception ex) {
+            log.error("Error sending the record {}", sourceRecord, ex);
+            sourceRecord.fail();
+            return;
+        }
+        pendingFlushQueue.add(sourceRecord);
+        currentBatchSize.addAndGet(sourceRecord.getMessage().get().size());
+        flushIfNeeded(false);
+    }
+
+    @Override
+    public void close() throws Exception {
+        isRunning = false;
+        flushIfNeeded(true);
+        scheduledExecutor.shutdown();
+        if (!scheduledExecutor.awaitTermination(10 * lingerMs, TimeUnit.MILLISECONDS)) {
+            log.error("scheduledExecutor did not terminate in {} ms", 10 * lingerMs);
+        }
+
+        task.stop();
+        connector.stop();
+        taskContext.close();
+
+        log.info("Kafka sink stopped.");
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
+        kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
+        Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set");
+        Preconditions.checkArgument(ctx.getSubscriptionType() == SubscriptionType.Failover
+                || ctx.getSubscriptionType() == SubscriptionType.Exclusive,
+                "Source must run with Exclusive or Failover subscription type");
+        topicName = kafkaSinkConfig.getTopic();
+        unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
+
+        String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass();
+        kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
+
+        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
+        connector = (SinkConnector) clazz.getConstructor().newInstance();
+
+        Class<? extends Task> taskClass = connector.taskClass();
+        sinkContext = new PulsarKafkaSinkContext();
+        connector.initialize(sinkContext);
+        connector.start(Maps.fromProperties(props));
+
+        List<Map<String, String>> configs = connector.taskConfigs(1);
+        Preconditions.checkNotNull(configs);
+        Preconditions.checkArgument(configs.size() == 1);
+
+        configs.forEach(x -> {
+            x.put(OFFSET_STORAGE_TOPIC_CONFIG, kafkaSinkConfig.getOffsetStorageTopic());
+            x.put(PULSAR_SERVICE_URL_CONFIG, kafkaSinkConfig.getPulsarServiceUrl());
+        });
+        task = (SinkTask) taskClass.getConstructor().newInstance();
+        taskContext =
+                new PulsarKafkaSinkTaskContext(configs.get(0), ctx, task::open);
+        task.initialize(taskContext);
+        task.start(configs.get(0));
+
+        maxBatchSize = kafkaSinkConfig.getBatchSize();
+        lingerMs = kafkaSinkConfig.getLingerTimeMs();
+        scheduledExecutor.scheduleAtFixedRate(() ->
+                this.flushIfNeeded(true), lingerMs, lingerMs, TimeUnit.MILLISECONDS);
+
+
+        isRunning = true;
+        log.info("Kafka sink started : {}.", props);
+    }
+
+    private void flushIfNeeded(boolean force) {
+        if (force || currentBatchSize.get() >= maxBatchSize) {

Review comment:
       This potentially will enqueue a lot of tasks in the flush queue which will burn CPU.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org