You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 13:48:34 UTC

[pulsar] branch branch-2.9 updated: KCA doesn't handle unchecked unchecked ConnectException/KafkaException for the task, it may lead to the connector hanging (#12441)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 13d935b  KCA doesn't handle unchecked unchecked ConnectException/KafkaException for the task, it may lead to the connector hanging (#12441)
13d935b is described below

commit 13d935b94f3259c22620b2cdb8abe01c9ec2b651
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Oct 22 03:01:42 2021 -0700

    KCA doesn't handle unchecked unchecked ConnectException/KafkaException for the task, it may lead to the connector hanging (#12441)
    
    (cherry picked from commit 34f237b3d6ba7c51902fd9574e62f242431db313)
---
 .../kafka/connect/AbstractKafkaConnectSource.java  |  13 +-
 .../io/kafka/connect/ErrFileStreamSourceTask.java  |  31 +++++
 .../kafka/connect/KafkaConnectSourceErrTest.java   | 145 +++++++++++++++++++++
 3 files changed, 185 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 5bd85a0..4612633 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -27,8 +27,6 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.runtime.TaskConfig;
@@ -168,6 +166,7 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
                 } catch (ExecutionException ex) {
                     // log the error, continue execution
                     log.error("execution exception while get flushFuture", ex);
+                    throw new Exception("Flush failed", ex.getCause());
                 } finally {
                     flushFuture = null;
                     currentBatch = null;
@@ -193,7 +192,6 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
 
     private static Map<String, String> PROPERTIES = Collections.emptyMap();
     private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
-    private static long FLUSH_TIMEOUT_MS = 60000;
 
     public abstract class AbstractKafkaSourceRecord<T> implements Record {
         @Getter
@@ -254,8 +252,15 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
                     flushFuture.complete(null);
                 } catch (InterruptedException exception) {
                     log.warn("Flush of {} offsets interrupted, cancelling", this);
+                    Thread.currentThread().interrupt();
                     offsetWriter.cancelFlush();
-                    flushFuture.completeExceptionally(new Exception("Failed to commit offsets"));
+                    flushFuture.completeExceptionally(new Exception("Failed to commit offsets", exception));
+                } catch (Throwable t) {
+                    // SourceTask can throw unchecked ConnectException/KafkaException.
+                    // Make sure the future is cancelled in that case
+                    log.warn("Flush of {} offsets failed, cancelling", this);
+                    offsetWriter.cancelFlush();
+                    flushFuture.completeExceptionally(new Exception("Failed to commit offsets", t));
                 }
             }
         }
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java
new file mode 100644
index 0000000..d17f32c
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrFileStreamSourceTask.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import org.apache.kafka.connect.file.FileStreamSourceTask;
+
+public class ErrFileStreamSourceTask extends FileStreamSourceTask {
+
+    @Override
+    public void commit() throws InterruptedException {
+        throw new org.apache.kafka.connect.errors.ConnectException("blah");
+    }
+
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java
new file mode 100644
index 0000000..cc04706
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SourceContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Test the implementation of {@link KafkaConnectSource}.
+ */
+@Slf4j
+public class KafkaConnectSourceErrTest extends ProducerConsumerBase  {
+
+    private Map<String, Object> config = new HashMap<>();
+    private String offsetTopicName;
+    // The topic to publish data to, for kafkaSource
+    private String topicName;
+    private KafkaConnectSource kafkaConnectSource;
+    private File tempFile;
+    private SourceContext context;
+    private PulsarClient client;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.pulsar.io.kafka.connect.ErrFileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+
+        this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
+        config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName);
+
+        this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
+        config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
+        tempFile = File.createTempFile("some-file-name", null);
+        config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+
+        this.context = mock(SourceContext.class);
+        this.client = PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build();
+        when(context.getPulsarClient()).thenReturn(this.client);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        if (this.client != null) {
+            this.client.close();
+        }
+        tempFile.delete();
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testOpenAndRead() throws Exception {
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        // use FileStreamSourceConnector, each line is a record, need "\n" and end of each record.
+        OutputStream os = Files.newOutputStream(tempFile.toPath());
+
+        String line1 = "This is the first line\n";
+        os.write(line1.getBytes());
+        os.flush();
+        log.info("write 2 lines.");
+
+        String line2 = "This is the second line\n";
+        os.write(line2.getBytes());
+        os.flush();
+
+        log.info("finish write, will read 2 lines");
+
+        // Note: FileStreamSourceTask read the whole line as Value, and set Key as null.
+        Record<KeyValue<byte[], byte[]>> record = kafkaConnectSource.read();
+        String readBack1 = new String(record.getValue().getValue());
+        assertTrue(line1.contains(readBack1));
+        assertNull(record.getValue().getKey());
+        log.info("read line1: {}", readBack1);
+        record.ack();
+
+        record = kafkaConnectSource.read();
+        String readBack2 = new String(record.getValue().getValue());
+        assertTrue(line2.contains(readBack2));
+        assertNull(record.getValue().getKey());
+        assertTrue(record.getPartitionId().isPresent());
+        assertFalse(record.getPartitionIndex().isPresent());
+        log.info("read line2: {}", readBack2);
+        record.ack();
+
+        String line3 = "This is the 3rd line\n";
+        os.write(line3.getBytes());
+        os.flush();
+
+        try {
+            kafkaConnectSource.read();
+            fail("expected exception");
+        } catch (Exception e) {
+            log.info("got exception", e);
+            assertTrue(e.getCause().getCause() instanceof org.apache.kafka.connect.errors.ConnectException);
+        }
+    }
+}