You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/15 22:35:11 UTC

[kafka] branch 2.1 updated: KAFKA-8320 : fix retriable exception package for source connectors (#6675)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 1fe0796  KAFKA-8320 : fix retriable exception package for source connectors (#6675)
1fe0796 is described below

commit 1fe07967035d523c0f2133d5f9508c3587d0924c
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Wed May 15 15:20:20 2019 -0700

    KAFKA-8320 : fix retriable exception package for source connectors (#6675)
    
    WorkerSourceTask is catching the exception from wrong package org.apache.kafka.common.errors. It is not clear from the API standpoint as to which package the connect framework supports - the one from common or connect. The safest thing would be to support both the packages even though it's less desirable.
    
    Author: Magesh Nandakumar <ma...@gmail.com>
    Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../java/org/apache/kafka/connect/runtime/WorkerSourceTask.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 623a210..14f71a5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -31,6 +30,7 @@ import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
@@ -242,7 +242,7 @@ class WorkerSourceTask extends WorkerTask {
     protected List<SourceRecord> poll() throws InterruptedException {
         try {
             return task.poll();
-        } catch (RetriableException e) {
+        } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) {
             log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e);
             // Do nothing. Let the framework poll whenever it's ready.
             return null;
@@ -340,7 +340,7 @@ class WorkerSourceTask extends WorkerTask {
                             }
                         });
                 lastSendFailed = false;
-            } catch (RetriableException e) {
+            } catch (org.apache.kafka.common.errors.RetriableException e) {
                 log.warn("{} Failed to send {}, backing off before retrying:", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;