You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Vedran Ljubovic (Jira)" <ji...@apache.org> on 2023/05/19 12:51:00 UTC

[jira] [Commented] (IGNITE-19459) Kafka Connect IgniteSinkConnector drops messages in case of error

    [ https://issues.apache.org/jira/browse/IGNITE-19459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724257#comment-17724257 ] 

Vedran Ljubovic commented on IGNITE-19459:
------------------------------------------

After some experiments, I have found that simply moving extractor initialization before others fixes the problem. Apparently StreamerContext.getStreamer() can sometimes take a few seconds, kc starts streaming messages before it is done and these messages will be dropped because the key is null. Patch is included.

> Kafka Connect IgniteSinkConnector drops messages in case of error
> -----------------------------------------------------------------
>
>                 Key: IGNITE-19459
>                 URL: https://issues.apache.org/jira/browse/IGNITE-19459
>             Project: Ignite
>          Issue Type: Bug
>          Components: extensions
>    Affects Versions: 2.15
>            Reporter: Vedran Ljubovic
>            Priority: Major
>         Attachments: event_dropping.patch
>
>
> We are using Kafka Connect (KC) to stream messages from Kafka to Ignite. Since the Kafka topic is using null key, we have created a custom SingleTupleExtractor to generate keys from payload. This works very well when everything is ok. However, if there are any kind of issues with starting a cache on Ignite (such as if cluster state is inactive or if cache has lostParts), we expect KC to fail to start. Instead, KC will start and appear to be running, and the messages will be dropped - which means that once the problems are removed, KC will not attempt to resend the messages even after restart! This for us is unacceptable, we believe that the system should be reliable and fault-tolerant.
> In logs we notice errors such as:
> {code:java}
> Failed to stream a record with null key! {code}
> which is useless since we do have a SingleTupleExtractor for this purpose and we can see that it isn't being called at all!
> When KC REST API [1] is used, we find the state is RUNNING which means that we have no way to detect this error other than parsing the logs which is unreliable.
> Upon investigating this issue, we found the following:
>  * Ignite connection and IgniteDataStreamer are declared as private static final fields of an inner class, they will be initialized when calling the start() method of IgniteSinkConnector. From KC docs [2], we conclude that method initialize() should be overloaded and the connections created there, also that appropriate exception types should be thrown so that KC knows that connection has failed and terminate the task/connector.
>  * When start() method is called, StreamerContext.getStreamer() in line 72 will fail with exception. This exception is not handled by KC so it doesn't know that task failed to start. In addition, code will never reach line 91 where SingleTupleExtractor is created therefore there will be no extractor. Solution would be to catch all types of exceptions and throw those exceptions which will be detected by KC as critical errors. Alternatively, put() method should throw an exception is stopped is true.
>  * When put() method is called, if there is no key and no extractor, in line 121 we see that the error is logged but exception is not thrown so KC thinks that the message was successfully streamed. Here, ConnectException should be thrown. If users want the current behavior (which is to stream Kafka messages with key and skip those without key), they can set option errors.tolerance = all in connector config. [3]
> [1] [https://docs.confluent.io/platform/current/connect/references/restapi.html]
> [2] [https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/sink/SinkTask.html]
> [3] [https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)