You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by "Justin Chen (Jira)" <ji...@apache.org> on 2021/09/13 22:15:00 UTC

[jira] [Created] (DRILL-7998) Drill queries for Kafka storage plugin returning incorrect/missing result set

Justin Chen created DRILL-7998:
----------------------------------

             Summary: Drill queries for Kafka storage plugin returning incorrect/missing result set
                 Key: DRILL-7998
                 URL: https://issues.apache.org/jira/browse/DRILL-7998
             Project: Apache Drill
          Issue Type: Bug
          Components: Storage - Kafka
    Affects Versions: 1.19.0
            Reporter: Justin Chen
         Attachments: case1_1.png, case1_2.png, case2_1.png, case2_2.png, case2_3.png

My team and I have experienced two scenarios in which querying Kafka results in an incorrect result set. I'm unsure of whether they have the same root cause.

*Case 1:*
Queries with a ORDER BY clause using kafkaMsgTimestamp return incorrect results. 

topic_1 is a topic with 2 partitions, no log compaction, in JSON format.

 
{code:java}
SELECT * FROM kafka.`topic_1` ORDER BY kafkaMsgTimestamp DESC LIMIT 10
{code}
Image attachment case1_1 shows that the latest kafkaMsgTimestamp was 1630631881114 (Fri Sep 03 2021 01:18:01 GMT+0000). 

 

However, applying a pushdown filter using kafkaMsgTimestamp with timestamp 1631160000000 (Thu Sep 09 2021 04:00:00 GMT+0000):

 
{code:java}
SELECT * FROM kafka.`topic_1` WHERE kafkaMsgTimestamp > 1631160000000 LIMIT 10
{code}
Image attachment case1_2 shows that there are many messages with more recent timestamps. Thus, ordering on kafkaMsgTimestamp seems to not correct correct results.

 

*Case 2:*

Queries are not returning correct results when using a WHERE clause unless an exact partition id and offset is provided.

topic_2 is a topic with > 200 partitions, using log compaction and in AVRO format.

 
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND `topic_2`.after.shop_id = 2 LIMIT 1
{code}
 Image attachment case2_1 shows that no such record exists with id 1 and shop_id 2.

However, we manually confirmed the record exists using a consumer and found its kafkaPartitionId and kafkaMsgOffset. Adding an additional WHERE condition with kafkaPartitionId and kafkaMsgTimestamp to speed up the query:
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND `topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgTimestamp > 1628196400000 LIMIT 1{code}
 Image attachment case2_2 shows that the record still cannot be found by Drill.

 

Finally, the exact kafkaMsgOffset was specified, along with kafkaPartitionId and kafkaMsgTimestamp:
{code:java}
SELECT * FROM kafka.`topic_2` WHERE `topic_2`.after.id = 1 AND `topic_2`.after.shop_id = 2 AND kafkaPartitionId = 110 AND kafkaMsgOffset = 85785074 AND kafkaMsgTimestamp > 1628196400000 LIMIT 1{code}
Image attachment case2_3 shows that Drill was only able to find the record when an exact partition and message offset was provided.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)