You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/08/25 21:04:07 UTC

[kafka] branch 2.3 updated: KAFKA-8586: Fail source tasks when producers fail to send records (#6993)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 1a5062c  KAFKA-8586: Fail source tasks when producers fail to send records (#6993)
1a5062c is described below

commit 1a5062cc8dd4dc78722040b69d8456052aef5a3d
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Sun Aug 25 13:54:00 2019 -0700

    KAFKA-8586: Fail source tasks when producers fail to send records (#6993)
    
    Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.
    
    Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  4 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    | 27 ++++++++----
 .../connect/runtime/WorkerSourceTaskTest.java      | 51 +++++++++++++++++-----
 3 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f848a18..f4b3124 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -545,8 +545,8 @@ public class Worker {
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
-        // worker, but this may compromise the delivery guarantees of Kafka Connect.
+        // These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
+        // but this may compromise the delivery guarantees of Kafka Connect.
         producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
         producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index bb1ad77..097ac3d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -57,6 +57,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * WorkerTask that uses a SourceTask to ingest data into Kafka.
@@ -78,6 +79,7 @@ class WorkerSourceTask extends WorkerTask {
     private final OffsetStorageWriter offsetWriter;
     private final Time time;
     private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
+    private final AtomicReference<Exception> producerSendException;
 
     private List<SourceRecord> toSend;
     private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
@@ -133,6 +135,7 @@ class WorkerSourceTask extends WorkerTask {
         this.flushing = false;
         this.stopRequestedLatch = new CountDownLatch(1);
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
+        this.producerSendException = new AtomicReference<>();
     }
 
     @Override
@@ -215,6 +218,8 @@ class WorkerSourceTask extends WorkerTask {
                     continue;
                 }
 
+                maybeThrowProducerSendException();
+
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                     long start = time.milliseconds();
@@ -240,6 +245,15 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
+    private void maybeThrowProducerSendException() {
+        if (producerSendException.get() != null) {
+            throw new ConnectException(
+                "Unrecoverable exception from producer send callback",
+                producerSendException.get()
+            );
+        }
+    }
+
     protected List<SourceRecord> poll() throws InterruptedException {
         try {
             return task.poll();
@@ -288,6 +302,7 @@ class WorkerSourceTask extends WorkerTask {
         recordBatch(toSend.size());
         final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
         for (final SourceRecord preTransformRecord : toSend) {
+            maybeThrowProducerSendException();
 
             retryWithToleranceOperator.sourceRecord(preTransformRecord);
             final SourceRecord record = transformationChain.apply(preTransformRecord);
@@ -322,22 +337,18 @@ class WorkerSourceTask extends WorkerTask {
                             @Override
                             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                 if (e != null) {
-                                    // Given the default settings for zero data loss, this should basically never happen --
-                                    // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request
-                                    // timeouts, callbacks with exceptions should never be invoked in practice. If the
-                                    // user overrode these settings, the best we can do is notify them of the failure via
-                                    // logging.
-                                    log.error("{} failed to send record to {}: {}", WorkerSourceTask.this, topic, e);
+                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
                                     log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                                    producerSendException.compareAndSet(null, e);
                                 } else {
+                                    recordSent(producerRecord);
+                                    counter.completeRecord();
                                     log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
                                             WorkerSourceTask.this,
                                             recordMetadata.topic(), recordMetadata.partition(),
                                             recordMetadata.offset());
                                     commitTaskRecord(preTransformRecord);
                                 }
-                                recordSent(producerRecord);
-                                counter.completeRecord();
                             }
                         });
                 lastSendFailed = false;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 24a13c2..1f5cc43 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,9 +21,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@@ -542,6 +544,21 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerCallbackFail() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectSendRecordProducerCallbackFail();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
     @Test
     public void testSendRecordsTaskCommitRecordFail() throws Exception {
         createWorkerTask();
@@ -711,16 +728,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         return expectSendRecordTaskCommitRecordSucceed(false, isRetry);
     }
 
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException {
+        return expectSendRecord(false, false, false, false);
+    }
+
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException {
-        return expectSendRecord(anyTimes, isRetry, true);
+        return expectSendRecord(anyTimes, isRetry, true, true);
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException {
-        return expectSendRecord(anyTimes, isRetry, false);
+        return expectSendRecord(anyTimes, isRetry, true, false);
     }
 
-    @SuppressWarnings("unchecked")
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
+        boolean anyTimes,
+        boolean isRetry,
+        boolean sendSuccess,
+        boolean commitSuccess
+    ) throws InterruptedException {
         expectConvertKeyValue(anyTimes);
         expectApplyTransformationChain(anyTimes);
 
@@ -737,15 +762,19 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // 2. Converted data passed to the producer, which will need callbacks invoked for flush to work
         IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent),
-                        EasyMock.capture(producerCallbacks)));
+            producer.send(EasyMock.capture(sent),
+                EasyMock.capture(producerCallbacks)));
         IAnswer<Future<RecordMetadata>> expectResponse = new IAnswer<Future<RecordMetadata>>() {
             @Override
             public Future<RecordMetadata> answer() throws Throwable {
                 synchronized (producerCallbacks) {
                     for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
-                        cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
-                                                           0L, 0L, 0, 0), null);
+                        if (sendSuccess) {
+                            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
+                                0L, 0L, 0, 0), null);
+                        } else {
+                            cb.onCompletion(null, new TopicAuthorizationException("foo"));
+                        }
                     }
                     producerCallbacks.reset();
                 }
@@ -757,8 +786,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         else
             expect.andAnswer(expectResponse);
 
-        // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
-        expectTaskCommitRecord(anyTimes, succeed);
+        if (sendSuccess) {
+            // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
+            expectTaskCommitRecord(anyTimes, commitSuccess);
+        }
 
         return sent;
     }