You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "slim bouguerra (JIRA)" <ji...@apache.org> on 2018/08/13 16:39:00 UTC

[jira] [Commented] (HIVE-20377) Hive Kafka Storage Handler

    [ https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578591#comment-16578591 ] 

slim bouguerra commented on HIVE-20377:
---------------------------------------

CC [~t3rmin4t0r] / [~ashutoshc]

> Hive Kafka Storage Handler
> --------------------------
>
>                 Key: HIVE-20377
>                 URL: https://issues.apache.org/jira/browse/HIVE-20377
>             Project: Hive
>          Issue Type: Bug
>    Affects Versions: 4.0.0
>            Reporter: slim bouguerra
>            Assignee: slim bouguerra
>            Priority: Major
>
> h1. Goal
> * Read streaming data form Kafka queue as an external table.
> * Allow streaming navigation by pushing down filters on Kafka record partition id, offset and timestamp. 
> * Insert streaming data form Kafka to an actual Hive internal table, using CTAS statement.
> h1. Example
> h2. Create the external table
> {code} 
> CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamps, page string, `user` string, language string, added int, deleted int, flags string,comment string, namespace string)
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES 
> ("kafka.topic" = "wikipedia", 
> "kafka.bootstrap.servers"="brokeraddress:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
> {code}
> h2. Kafka Metadata
> In order to keep track of Kafka records the storage handler will add automatically the Kafka row metadata eg partition id, record offset and record timestamp. 
> {code}
> DESCRIBE EXTENDED kafka_table
> timestamp              	timestamp           	from deserializer   
> page                	string              	from deserializer   
> user                	string              	from deserializer   
> language            	string              	from deserializer   
> country             	string              	from deserializer   
> continent           	string              	from deserializer   
> namespace           	string              	from deserializer   
> newpage             	boolean             	from deserializer   
> unpatrolled         	boolean             	from deserializer   
> anonymous           	boolean             	from deserializer   
> robot               	boolean             	from deserializer   
> added               	int                 	from deserializer   
> deleted             	int                 	from deserializer   
> delta               	bigint              	from deserializer   
> __partition         	int                 	from deserializer   
> __offset            	bigint              	from deserializer   
> __timestamp         	bigint              	from deserializer   
> {code}
> h2. Filter push down.
> Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a given offset. The proposed storage handler will be able to leverage such API by pushing down filters over metadata columns, namely __partition (int), __offset(long) and __timestamp(long)
> For instance Query like
> {code} 
> select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) or (`__offset` = 109);
> {code}
> Will result on a scan of partition 0 only then read only records between offset 4 and 109. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)