You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Hive QA (JIRA)" <ji...@apache.org> on 2018/09/05 03:38:00 UTC
[jira] [Commented] (HIVE-20377) Hive Kafka Storage Handler
[ https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603866#comment-16603866 ]
Hive QA commented on HIVE-20377:
--------------------------------
Here are the results of testing the latest attachment:
https://issues.apache.org/jira/secure/attachment/12938298/HIVE-20377.19.patch
{color:green}SUCCESS:{color} +1 due to 8 test(s) being added or modified.
{color:green}SUCCESS:{color} +1 due to 14925 tests passed
Test results: https://builds.apache.org/job/PreCommit-HIVE-Build/13592/testReport
Console output: https://builds.apache.org/job/PreCommit-HIVE-Build/13592/console
Test logs: http://104.198.109.242/logs/PreCommit-HIVE-Build-13592/
Messages:
{noformat}
Executing org.apache.hive.ptest.execution.TestCheckPhase
Executing org.apache.hive.ptest.execution.PrepPhase
Executing org.apache.hive.ptest.execution.YetusPhase
Executing org.apache.hive.ptest.execution.ExecutionPhase
Executing org.apache.hive.ptest.execution.ReportingPhase
{noformat}
This message is automatically generated.
ATTACHMENT ID: 12938298 - PreCommit-HIVE-Build
> Hive Kafka Storage Handler
> --------------------------
>
> Key: HIVE-20377
> URL: https://issues.apache.org/jira/browse/HIVE-20377
> Project: Hive
> Issue Type: New Feature
> Affects Versions: 4.0.0
> Reporter: slim bouguerra
> Assignee: slim bouguerra
> Priority: Major
> Attachments: HIVE-20377.10.patch, HIVE-20377.11.patch, HIVE-20377.12.patch, HIVE-20377.15.patch, HIVE-20377.18.patch, HIVE-20377.18.patch, HIVE-20377.19.patch, HIVE-20377.19.patch, HIVE-20377.19.patch, HIVE-20377.4.patch, HIVE-20377.5.patch, HIVE-20377.6.patch, HIVE-20377.8.patch, HIVE-20377.8.patch, HIVE-20377.patch
>
>
> h1. Goal
> * Read streaming data form Kafka queue as an external table.
> * Allow streaming navigation by pushing down filters on Kafka record partition id, offset and timestamp.
> * Insert streaming data form Kafka to an actual Hive internal table, using CTAS statement.
> h1. Example
> h2. Create the external table
> {code}
> CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` string, language string, added int, deleted int, flags string,comment string, namespace string)
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES
> ("kafka.topic" = "wikipedia",
> "kafka.bootstrap.servers"="brokeraddress:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
> {code}
> h2. Kafka Metadata
> In order to keep track of Kafka records the storage handler will add automatically the Kafka row metadata eg partition id, record offset and record timestamp.
> {code}
> DESCRIBE EXTENDED kafka_table
> timestamp timestamp from deserializer
> page string from deserializer
> user string from deserializer
> language string from deserializer
> country string from deserializer
> continent string from deserializer
> namespace string from deserializer
> newpage boolean from deserializer
> unpatrolled boolean from deserializer
> anonymous boolean from deserializer
> robot boolean from deserializer
> added int from deserializer
> deleted int from deserializer
> delta bigint from deserializer
> __partition int from deserializer
> __offset bigint from deserializer
> __timestamp bigint from deserializer
> {code}
> h2. Filter push down.
> Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a given offset. The proposed storage handler will be able to leverage such API by pushing down filters over metadata columns, namely __partition (int), __offset(long) and __timestamp(long)
> For instance Query like
> {code}
> select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and `__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) or (`__offset` = 109);
> {code}
> Will result on a scan of partition 0 only then read only records between offset 4 and 109.
> h2. With timestamp seeks
> The seeking based on the internal timestamps allows the handler to run on recently arrived data, by doing
> {code}
> select count(*) from kafka_table where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '20' hours) ;
> {code}
> This allows for implicit relationships between event timestamps and kafka timestamps to be expressed in queries (i.e event_timestamp is always < than kafka __timestamp and kafka __timestamp is never > 15 minutes from event etc).
> h2. More examples with Avro
> {code}
> CREATE EXTERNAL TABLE wiki_kafka_avro_table
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES
> ("kafka.topic" = "wiki_kafka_avro_table",
> "kafka.bootstrap.servers"="localhost:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
> 'avro.schema.literal'='{
> "type" : "record",
> "name" : "Wikipedia",
> "namespace" : "org.apache.hive.kafka",
> "version": "1",
> "fields" : [ {
> "name" : "isrobot",
> "type" : "boolean"
> }, {
> "name" : "channel",
> "type" : "string"
> }, {
> "name" : "timestamp",
> "type" : "string"
> }, {
> "name" : "flags",
> "type" : "string"
> }, {
> "name" : "isunpatrolled",
> "type" : "boolean"
> }, {
> "name" : "page",
> "type" : "string"
> }, {
> "name" : "diffurl",
> "type" : "string"
> }, {
> "name" : "added",
> "type" : "long"
> }, {
> "name" : "comment",
> "type" : "string"
> }, {
> "name" : "commentlength",
> "type" : "long"
> }, {
> "name" : "isnew",
> "type" : "boolean"
> }, {
> "name" : "isminor",
> "type" : "boolean"
> }, {
> "name" : "delta",
> "type" : "long"
> }, {
> "name" : "isanonymous",
> "type" : "boolean"
> }, {
> "name" : "user",
> "type" : "string"
> }, {
> "name" : "deltabucket",
> "type" : "double"
> }, {
> "name" : "deleted",
> "type" : "long"
> }, {
> "name" : "namespace",
> "type" : "string"
> } ]
> }'
> );
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)