You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/18 02:57:00 UTC

[jira] [Commented] (DRILL-5977) predicate pushdown support kafkaMsgOffset

    [ https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480062#comment-16480062 ] 

ASF GitHub Bot commented on DRILL-5977:
---------------------------------------

aravi5 opened a new pull request #1272: DRILL-5977: Filter Pushdown in Drill-Kafka plugin
URL: https://github.com/apache/drill/pull/1272
 
 
   # Implementation of Filter Pushdown in Drill-Kafka plugin
   
   This PR contains changes for implementing `Filter Pushdown` feature for conditions on `Metadata fields` such as *Timestamps*, *Offsets* and *Partitions*. Below is a high level description, to help the reviewers.
   
   [Design document is available here](https://docs.google.com/document/d/1SZ4wO4Ii4nAHwgWbY6JJynPOseCha0DBBZJ2ig1RO0M/edit#heading=h.uvq8jbet5xdh)
   
   ## Topics, Partitions, Offsets and Timestamps
   
   Apache Kafka is a distributed `publisher-subscriber` system. Kafka consists of `Topics` which are something like channels. Producer produce to topics and Consumers subscribe to topics. In the Drill world, every topic is considered as a `Table`.
   
   To achieve load balancing and parallelism, the concept of `Partitions` was introduced. Each topic can have multiple partitions and messages produced to topic are distributed across partitions (producers can also specify partition to produce). 
   
   Kafka is essentially a distributed log, so every message within a `Topic-Partition` is identified by a unique `Offset`. This offset represents the distance of any message from the _beginning_ of the topic-partition.
   
   Since Kafka can be used to store events / event-logs, one would look to Kafka to find out when an event occurred. So every messages is thus associated with a `Timestamp`. Kafka allows the producer (user application) to specify the timestamp value or it can add its (server / broker) timestamp before storing the message / log in topic-partition. Note that since user can specify the timestamp value, it is possible to that the timestamp values can appear out of order as we consume from beginning to end.
   
   ## Drill-Kafka Plugin
   
   Drill-Kafka Plugin is very useful to explore the data and perform analytics. Currently, all of the data that resides in a topic has to be consumed by Drill before applying filters. However, it is possible to reduce the number of messages read from Kafka based on conditions provided on fields - `kafkaMsgTimestamp`, `kafkaMsgOffset` and `kafkaPartitionId`.
   
   Scan specification for Drill-Kafka plugin is a collection of specification for individual partitions within each topic. This specification consists of `topicName`, `partitionId`, `startOffset` and `endOffset`. 
   
   ## Filter Pushdown Implementation
   
   Filter pushdown implementation is governed by the idea that conditions on `metadata fields` can be translated to modifying the `startOffset` and `endOffset` for each partition. Thus reducing the amount of data read.
   
   ### Pushdown Timestamp
   A common query when querying a event log system involves a query with conditions on timestamp field. For Drill-Kafka plugin, `kafkaMsgTimestamp` field maps to timestamp stored as a part of each message in Kafka. A pushdown for following compare functions is supported `equal`, `greater_than` and `greater_than_or_equal_to`.
   
   Kafka exposes a `Consumer API` to obtain earliest offset for a given timestamp value.
   ```
   public java.util.Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)
   ```
   This API is used to figure out the `startOffset` for each partition in a topic. Note that the timestamps may not appear in increasing order when reading from a Kafka topic because Kafka gives user flexibility to define timestamp for a message. However, the above API returns the first offset (from the beginning of a topic partition) where timestamp is *greater or equal* to timestamp requested. Thus, we cannot support pushdown on `less_than` or `less_than_or_equal_to` because a lesser timestamp may exist beyond `endOffset` computed.
   
   
   ### Pushdown Offset
   Conditions on `kafkaMsgOffset` will be pushed down for `equal`, `greater_than`, `greater_than_or_equal_to`, `less_than` and `less_than_or_equal_to` functions.
   
   ### Pushdown Partition
   Conditions on `kafkaPartitionId` will help limit number of partitions to be scanned. (Useful for data exploration).
   
   Even after creating new scan specification, based on the filter conditions, we will have to re-apply the filters because the optimization only restricts the scan range. 
   
   ### Pushdown with OR
   At any level of the expression tree, OR node will return a new scan specification only if ALL conditions under OR support pushdown.
   
   
   ## Handling corner-cases
   Kafka supports `Time-To-Live` thus messages can expire as at some point in time. So it is possible for queries to include expired values / non-existent values for timestamps and offsets. The implementation handles these cases as well.
   
   
   ## Unit Test
   Unit tests are added and cover queries (both with pushdown supported and not supported).
   ```
   Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 158.327 sec - in org.apache.drill.exec.store.kafka.KafkaFilterPushdownTest
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> predicate pushdown support kafkaMsgOffset
> -----------------------------------------
>
>                 Key: DRILL-5977
>                 URL: https://issues.apache.org/jira/browse/DRILL-5977
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: B Anil Kumar
>            Assignee: Abhishek Ravi
>            Priority: Major
>             Fix For: 1.14.0
>
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting point or a count? Perhaps I want to run my query every five minutes, scanning only those messages since the previous scan. Or, I want to limit my take to, say, the next 1000 messages. Could we use a pseudo-column such as "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM <some topic> WHERE kafkaMsgOffset > 12345
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)