You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kaushik srinivas (Jira)" <ji...@apache.org> on 2021/03/02 14:32:00 UTC

[jira] [Commented] (KAFKA-12164) ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.

    [ https://issues.apache.org/jira/browse/KAFKA-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17293740#comment-17293740 ] 

kaushik srinivas commented on KAFKA-12164:
------------------------------------------

 Hi [~kkonstantine]

Can you provide some insights into this issue ?

Thanks,

Kaushik.

> ssue when kafka connect worker pod restart, during creation of nested partition directories in hdfs file system.
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: kaushik srinivas
>            Priority: Critical
>
> In our production labs, an issue is observed. Below is the sequence of the same.
>  # hdfs connector is added to the connect worker.
>  # hdfs connector is creating folders in hdfs /test1=1/test2=2/
> Based on the custom partitioner. Here test1 and test2 are two separate nested directories derived from multiple fields in the record using a custom partitioner.
>  # Now kafka connect hdfs connector uses below function calls to create the directories in the hdfs file system.
> fs.mkdirs(new Path(filename));
> ref: [https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/storage/HdfsStorage.java]
> Now the important thing to note is that if mkdirs() is a non atomic operation (i.e can result in partial execution if interrupted)
> then suppose the first directory ie test1 is created and just before creation of test2 in hdfs happens if there is a restart to the connect worker pod. Then the hdfs file system will remain with partial folders created for partitions during the restart time frames.
> So we might have conditions in hdfs as below
> /test1=0/test2=0/
> /test1=1/
> /test1=2/test2=2
> /test1=3/test2=3
> So the second partition has a missing directory in it. And if hive integration is enabled, hive metastore exceptions will occur since there is a partition expected from hive table is missing for few partitions in hdfs.
> *This can occur to any connector with some ongoing non atomic operation and a restart is triggered to kafka connect worker pod. This will result in some partially completed states in the system and may cause issues for the connector to continue its operation*.
> *This is a very critical issue and needs some attention on ways for handling the same.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)