You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/25 15:34:24 UTC

[GitHub] sijie closed pull request #2627: Debezium: kafka connect offset store

sijie closed pull request #2627: Debezium: kafka connect offset store
URL: https://github.com/apache/pulsar/pull/2627
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index 5fefa43a74..52e589cfc7 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -50,6 +50,12 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-runtime</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarKafkaWorkerConfig.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarKafkaWorkerConfig.java
new file mode 100644
index 0000000000..a4f65e279f
--- /dev/null
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarKafkaWorkerConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+
+/**
+ * Pulsar Kafka Worker Config.
+ */
+public class PulsarKafkaWorkerConfig extends WorkerConfig {
+
+    private static final ConfigDef CONFIG;
+
+    /**
+     * <code>offset.storage.topic</code>
+     */
+    public static final String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
+    private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "pulsar topic to store kafka connector offsets in";
+
+
+    /**
+     * <code>pulsar.service.url</code>
+     */
+    public static final String PULSAR_SERVICE_URL_CONFIG = "pulsar.service.url";
+    private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar service url";
+
+    static {
+        CONFIG = new ConfigDef()
+            .define(OFFSET_STORAGE_TOPIC_CONFIG,
+                Type.STRING,
+                Importance.HIGH,
+                OFFSET_STORAGE_TOPIC_CONFIG_DOC)
+            .define(PULSAR_SERVICE_URL_CONFIG,
+                Type.STRING,
+                Importance.HIGH,
+                PULSAR_SERVICE_URL_CONFIG_DOC);
+    }
+
+
+    public PulsarKafkaWorkerConfig(Map<String, String> props) {
+        super(CONFIG, props);
+    }
+}
diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStore.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStore.java
new file mode 100644
index 0000000000..80d37bdff3
--- /dev/null
+++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStore.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Implementation of {@link OffsetBackingStore} that uses a Pulsar topic to store offset data.
+ */
+@Slf4j
+public class PulsarOffsetBackingStore implements OffsetBackingStore {
+
+    private Map<ByteBuffer, ByteBuffer> data;
+    private PulsarClient client;
+    private String serviceUrl;
+    private String topic;
+    private Producer<byte[]> producer;
+    private Reader<byte[]> reader;
+    private volatile CompletableFuture<Void> outstandingReadToEnd = null;
+
+    @Override
+    public void configure(WorkerConfig workerConfig) {
+        this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
+        checkArgument(!isBlank(topic), "Offset storage topic must be specified");
+        this.serviceUrl = workerConfig.getString(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
+        checkArgument(!isBlank(serviceUrl), "Pulsar service url must be specified at `"
+            + WorkerConfig.BOOTSTRAP_SERVERS_CONFIG + "`");
+        this.data = new HashMap<>();
+        log.info("Configure offset backing store on pulsar topic {} at cluster {}",
+            topic, serviceUrl);
+    }
+
+    void readToEnd(CompletableFuture<Void> future) {
+        synchronized (this) {
+            if (outstandingReadToEnd != null) {
+                outstandingReadToEnd.whenComplete((result, cause) -> {
+                    if (null != cause) {
+                        future.completeExceptionally(cause);
+                    } else {
+                        future.complete(result);
+                    }
+                });
+                // return if the outstanding read has been issued
+                return;
+            } else {
+                outstandingReadToEnd = future;
+                future.whenComplete((result, cause) -> {
+                    synchronized (PulsarOffsetBackingStore.this) {
+                        outstandingReadToEnd = null;
+                    }
+                });
+            }
+        }
+        producer.flushAsync().whenComplete((ignored, cause) -> {
+            if (null != cause) {
+                future.completeExceptionally(cause);
+            } else {
+                checkAndReadNext(future);
+            }
+        });
+    }
+
+    private void checkAndReadNext(CompletableFuture<Void> endFuture) {
+        reader.hasMessageAvailableAsync().whenComplete((hasMessageAvailable, cause) -> {
+            if (null != cause) {
+                endFuture.completeExceptionally(cause);
+            } else {
+                if (hasMessageAvailable) {
+                    readNext(endFuture);
+                } else {
+                    endFuture.complete(null);
+                }
+            }
+        });
+    }
+
+    private void readNext(CompletableFuture<Void> endFuture) {
+        reader.readNextAsync().whenComplete((message, cause) -> {
+            if (null != cause) {
+                endFuture.completeExceptionally(cause);
+            } else {
+                processMessage(message);
+                checkAndReadNext(endFuture);
+            }
+        });
+    }
+
+    void processMessage(Message<byte[]> message) {
+        synchronized (data) {
+            data.put(
+                ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
+                ByteBuffer.wrap(message.getValue()));
+        }
+    }
+
+    @Override
+    public void start() {
+        try {
+            client = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .build();
+            log.info("Successfully created pulsar client to {}", serviceUrl);
+            producer = client.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+            log.info("Successfully created producer to produce updates to topic {}", topic);
+            reader = client.newReader(Schema.BYTES)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+            log.info("Successfully created reader to replay updates from topic {}", topic);
+            CompletableFuture<Void> endFuture = new CompletableFuture<>();
+            readToEnd(endFuture);
+            endFuture.join();
+        } catch (PulsarClientException e) {
+            log.error("Failed to create pulsar client to cluster at {}", serviceUrl, e);
+            throw new RuntimeException("Failed to create pulsar client to cluster at " + serviceUrl, e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (null != producer) {
+            try {
+                producer.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close producer", e);
+            }
+        }
+        if (null != reader) {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                log.warn("Failed to close reader", e);
+            }
+        }
+        if (null != client) {
+            try {
+                client.close();
+            } catch (IOException e) {
+                log.warn("Failed to close client", e);
+            }
+        }
+    }
+
+    @Override
+    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys,
+                                                   Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+        CompletableFuture<Void> endFuture = new CompletableFuture<>();
+        readToEnd(endFuture);
+        return endFuture.thenApply(ignored -> {
+            Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
+            for (ByteBuffer key : keys) {
+                ByteBuffer value;
+                synchronized (data) {
+                    value = data.get(key);
+                }
+                if (null != value) {
+                    values.put(key, value);
+                }
+            }
+            if (null != callback) {
+                callback.onCompletion(null, values);
+            }
+            return values;
+        }).whenComplete((ignored, cause) -> {
+            if (null != cause && null != callback) {
+                callback.onCompletion(cause, null);
+            }
+        });
+    }
+
+    @Override
+    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) {
+        values.forEach((key, value) -> {
+            ByteBuf bb = Unpooled.wrappedBuffer(key);
+            byte[] keyBytes = ByteBufUtil.getBytes(bb);
+            bb = Unpooled.wrappedBuffer(value);
+            byte[] valBytes = ByteBufUtil.getBytes(bb);
+            producer.newMessage()
+                .key(new String(keyBytes, UTF_8))
+                .value(valBytes)
+                .sendAsync();
+        });
+
+        return producer.flushAsync().whenComplete((ignored, cause) -> {
+            if (null != callback) {
+                callback.onCompletion(cause, ignored);
+            }
+            if (null == cause) {
+                readToEnd(new CompletableFuture<>());
+            }
+        });
+    }
+}
diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStoreTest.java b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStoreTest.java
new file mode 100644
index 0000000000..4649061bf0
--- /dev/null
+++ b/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarOffsetBackingStoreTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.debezium;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test the implementation of {@link PulsarOffsetBackingStore}.
+ */
+@Slf4j
+public class PulsarOffsetBackingStoreTest extends ProducerConsumerBase {
+
+    private Map<String, String> defaultProps = new HashMap<>();
+    private PulsarKafkaWorkerConfig distributedConfig;
+    private String topicName;
+    private PulsarOffsetBackingStore offsetBackingStore;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        this.topicName = "persistent://my-property/my-ns/offset-topic";
+        this.defaultProps.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, brokerUrl.toString());
+        this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicName);
+        this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
+        this.offsetBackingStore = new PulsarOffsetBackingStore();
+        this.offsetBackingStore.configure(distributedConfig);
+        this.offsetBackingStore.start();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        if (null != offsetBackingStore) {
+            offsetBackingStore.stop();
+            offsetBackingStore = null;
+        }
+
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testGetFromEmpty() throws Exception {
+        assertTrue(offsetBackingStore.get(
+            Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
+            null
+        ).get().isEmpty());
+    }
+
+    @Test
+    public void testGetFromEmptyCallback() throws Exception {
+        CompletableFuture<Map<ByteBuffer, ByteBuffer>> callbackFuture = new CompletableFuture<>();
+        assertTrue(offsetBackingStore.get(
+            Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8))),
+            (error, result) -> {
+                if (null != error) {
+                    callbackFuture.completeExceptionally(error);
+                } else {
+                    callbackFuture.complete(result);
+                }
+            }
+        ).get().isEmpty());
+        assertTrue(callbackFuture.get().isEmpty());
+    }
+
+    @Test
+    public void testGetSet() throws Exception {
+        testGetSet(false);
+    }
+
+    @Test
+    public void testGetSetCallback() throws Exception {
+        testGetSet(true);
+    }
+
+    private void testGetSet(boolean testCallback) throws Exception {
+        final int numKeys = 10;
+        final List<ByteBuffer> keys = new ArrayList<>();
+        for (int i = 0; i < numKeys; i++) {
+            Map<ByteBuffer, ByteBuffer> kvs = new HashMap<>();
+            ByteBuffer key = ByteBuffer.wrap(("test-key-" + i).getBytes(UTF_8));
+            keys.add(key);
+            kvs.put(
+                key,
+                ByteBuffer.wrap(("test-val-" + i).getBytes(UTF_8)));
+            CompletableFuture<Void> setCallback = new CompletableFuture<>();
+            offsetBackingStore.set(
+                kvs,
+                testCallback ? (Callback<Void>) (error, result) -> {
+                    if (null != error) {
+                        setCallback.completeExceptionally(error);
+                    } else {
+                        setCallback.complete(result);
+                    }
+                } : null
+            ).get();
+            if (testCallback) {
+                setCallback.join();
+            }
+        }
+
+        Map<ByteBuffer, ByteBuffer> result =
+            offsetBackingStore.get(keys, null).get();
+        assertEquals(numKeys, result.size());
+        AtomicInteger count = new AtomicInteger();
+        new TreeMap<>(result).forEach((key, value) -> {
+            int idx = count.getAndIncrement();
+            byte[] keyData = ByteBufUtil.getBytes(Unpooled.wrappedBuffer(key));
+            assertEquals(new String(keyData, UTF_8), "test-key-" + idx);
+            byte[] valData = ByteBufUtil.getBytes(Unpooled.wrappedBuffer(value));
+            assertEquals(new String(valData, UTF_8), "test-val-" + idx);
+        });
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services