You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/08/25 22:08:00 UTC

[kafka] branch 1.0 updated: KAFKA-8586: Fail source tasks when producers fail to send records (#6993)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new ac98d95  KAFKA-8586: Fail source tasks when producers fail to send records (#6993)
ac98d95 is described below

commit ac98d955f6ccc41c8484a6aea39feac91b23e143
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Sun Aug 25 13:54:00 2019 -0700

    KAFKA-8586: Fail source tasks when producers fail to send records (#6993)
    
    Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.
    
    Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
---
 .../kafka/connect/runtime/WorkerSourceTask.java    | 43 +++++++++++++-----
 .../connect/runtime/WorkerSourceTaskTest.java      | 51 +++++++++++++++++-----
 2 files changed, 73 insertions(+), 21 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 27dd388..d0257b9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -49,6 +48,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * WorkerTask that uses a SourceTask to ingest data into Kafka.
@@ -68,6 +68,7 @@ class WorkerSourceTask extends WorkerTask {
     private final OffsetStorageWriter offsetWriter;
     private final Time time;
     private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
+    private final AtomicReference<Exception> producerSendException;
 
     private List<SourceRecord> toSend;
     private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
@@ -117,6 +118,7 @@ class WorkerSourceTask extends WorkerTask {
         this.flushing = false;
         this.stopRequestedLatch = new CountDownLatch(1);
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
+        this.producerSendException = new AtomicReference<>();
     }
 
     @Override
@@ -198,10 +200,12 @@ class WorkerSourceTask extends WorkerTask {
                     continue;
                 }
 
+                maybeThrowProducerSendException();
+
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                     long start = time.milliseconds();
-                    toSend = task.poll();
+                    toSend = poll();
                     if (toSend != null) {
                         recordPollReturned(toSend.size(), time.milliseconds() - start);
                     }
@@ -223,6 +227,25 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
+    private void maybeThrowProducerSendException() {
+        if (producerSendException.get() != null) {
+            throw new ConnectException(
+                "Unrecoverable exception from producer send callback",
+                producerSendException.get()
+            );
+        }
+    }
+
+    protected List<SourceRecord> poll() throws InterruptedException {
+        try {
+            return task.poll();
+        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
+            log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
+            // Do nothing. Let the framework poll whenever it's ready.
+            return null;
+        }
+    }
+
     /**
      * Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can
      * be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.
@@ -233,6 +256,8 @@ class WorkerSourceTask extends WorkerTask {
         recordBatch(toSend.size());
         final SourceRecordWriteCounter counter = new SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
         for (final SourceRecord preTransformRecord : toSend) {
+            maybeThrowProducerSendException();
+
             final SourceRecord record = transformationChain.apply(preTransformRecord);
 
             if (record == null) {
@@ -269,26 +294,22 @@ class WorkerSourceTask extends WorkerTask {
                             @Override
                             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                 if (e != null) {
-                                    // Given the default settings for zero data loss, this should basically never happen --
-                                    // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request
-                                    // timeouts, callbacks with exceptions should never be invoked in practice. If the
-                                    // user overrode these settings, the best we can do is notify them of the failure via
-                                    // logging.
-                                    log.error("{} failed to send record to {}: {}", WorkerSourceTask.this, topic, e);
+                                    log.error("{} failed to send record to {}:", WorkerSourceTask.this, topic, e);
                                     log.debug("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
+                                    producerSendException.compareAndSet(null, e);
                                 } else {
+                                    recordSent(producerRecord);
+                                    counter.completeRecord();
                                     log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
                                             WorkerSourceTask.this,
                                             recordMetadata.topic(), recordMetadata.partition(),
                                             recordMetadata.offset());
                                     commitTaskRecord(preTransformRecord);
                                 }
-                                recordSent(producerRecord);
-                                counter.completeRecord();
                             }
                         });
                 lastSendFailed = false;
-            } catch (RetriableException e) {
+            } catch (org.apache.kafka.common.errors.RetriableException e) {
                 log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index dfdf415..2d6794a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,9 +21,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -534,6 +536,21 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerCallbackFail() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectSendRecordProducerCallbackFail();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
     @Test
     public void testSendRecordsTaskCommitRecordFail() throws Exception {
         createWorkerTask();
@@ -703,16 +720,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         return expectSendRecordTaskCommitRecordSucceed(false, isRetry);
     }
 
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException {
+        return expectSendRecord(false, false, false, false);
+    }
+
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException {
-        return expectSendRecord(anyTimes, isRetry, true);
+        return expectSendRecord(anyTimes, isRetry, true, true);
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException {
-        return expectSendRecord(anyTimes, isRetry, false);
+        return expectSendRecord(anyTimes, isRetry, true, false);
     }
 
-    @SuppressWarnings("unchecked")
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
+        final boolean anyTimes,
+        final boolean isRetry,
+        final boolean sendSuccess,
+        final boolean commitSuccess
+    ) throws InterruptedException {
         expectConvertKeyValue(anyTimes);
         expectApplyTransformationChain(anyTimes);
 
@@ -729,15 +754,19 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // 2. Converted data passed to the producer, which will need callbacks invoked for flush to work
         IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent),
-                        EasyMock.capture(producerCallbacks)));
+            producer.send(EasyMock.capture(sent),
+                EasyMock.capture(producerCallbacks)));
         IAnswer<Future<RecordMetadata>> expectResponse = new IAnswer<Future<RecordMetadata>>() {
             @Override
             public Future<RecordMetadata> answer() throws Throwable {
                 synchronized (producerCallbacks) {
                     for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
-                        cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
-                                                           0L, 0L, 0, 0), null);
+                        if (sendSuccess) {
+                            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0,
+                                0L, 0L, 0, 0), null);
+                        } else {
+                            cb.onCompletion(null, new TopicAuthorizationException("foo"));
+                        }
                     }
                     producerCallbacks.reset();
                 }
@@ -749,8 +778,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         else
             expect.andAnswer(expectResponse);
 
-        // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
-        expectTaskCommitRecord(anyTimes, succeed);
+        if (sendSuccess) {
+            // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
+            expectTaskCommitRecord(anyTimes, commitSuccess);
+        }
 
         return sent;
     }