You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/10/13 17:48:21 UTC
[5/5] hive git commit: HIVE-20639 : Add ability to Write Data from
Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)
HIVE-20639 : Add ability to Write Data from Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5ace1f78
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5ace1f78
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5ace1f78
Branch: refs/heads/master
Commit: 5ace1f7839c857902ff32181881aacf799c6c2fa
Parents: 7a88ffb
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed Sep 26 10:08:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Oct 13 10:42:53 2018 -0700
----------------------------------------------------------------------
itests/qtest-druid/pom.xml | 7 +-
kafka-handler/README.md | 217 +++++++++++
kafka-handler/pom.xml | 28 +-
.../hadoop/hive/kafka/GenericKafkaSerDe.java | 162 --------
.../hadoop/hive/kafka/HiveKafkaProducer.java | 256 +++++++++++++
.../hadoop/hive/kafka/KafkaInputFormat.java | 208 ++++++++++
.../hadoop/hive/kafka/KafkaInputSplit.java | 209 ++++++++++
.../hadoop/hive/kafka/KafkaJsonSerDe.java | 15 +-
.../hadoop/hive/kafka/KafkaOutputFormat.java | 117 ++++++
.../hive/kafka/KafkaPullerInputFormat.java | 205 ----------
.../hive/kafka/KafkaPullerInputSplit.java | 213 -----------
.../hive/kafka/KafkaPullerRecordReader.java | 173 ---------
.../hadoop/hive/kafka/KafkaRecordIterator.java | 75 ++--
.../hadoop/hive/kafka/KafkaRecordReader.java | 173 +++++++++
.../hadoop/hive/kafka/KafkaRecordWritable.java | 209 ----------
.../hadoop/hive/kafka/KafkaScanTrimmer.java | 83 ++--
.../apache/hadoop/hive/kafka/KafkaSerDe.java | 380 +++++++++++++++++++
.../hadoop/hive/kafka/KafkaStorageHandler.java | 313 ++++++++++++---
.../hadoop/hive/kafka/KafkaStreamingUtils.java | 255 -------------
.../hadoop/hive/kafka/KafkaTableProperties.java | 83 ++++
.../apache/hadoop/hive/kafka/KafkaUtils.java | 255 +++++++++++++
.../apache/hadoop/hive/kafka/KafkaWritable.java | 219 +++++++++++
.../hadoop/hive/kafka/MetadataColumn.java | 120 ++++++
.../apache/hadoop/hive/kafka/RetryUtils.java | 154 ++++++++
.../hadoop/hive/kafka/SimpleKafkaWriter.java | 185 +++++++++
.../hive/kafka/TransactionalKafkaWriter.java | 363 ++++++++++++++++++
.../hive/kafka/HiveKafkaProducerTest.java | 179 +++++++++
.../hadoop/hive/kafka/KafkaBrokerResource.java | 100 +++++
.../hadoop/hive/kafka/KafkaInputSplitTest.java | 126 ++++++
.../hive/kafka/KafkaPullerInputSplitTest.java | 131 -------
.../hive/kafka/KafkaRecordIteratorTest.java | 244 ++++++------
.../hive/kafka/KafkaRecordWritableTest.java | 84 ----
.../hadoop/hive/kafka/KafkaScanTrimmerTest.java | 46 +--
.../hive/kafka/KafkaStorageHandlerTest.java | 145 +++++++
.../hive/kafka/KafkaStreamingUtilsTest.java | 108 ------
.../hadoop/hive/kafka/KafkaUtilsTest.java | 124 ++++++
.../hadoop/hive/kafka/KafkaWritableTest.java | 94 +++++
.../hive/kafka/SimpleKafkaWriterTest.java | 194 ++++++++++
.../kafka/TransactionalKafkaWriterTest.java | 246 ++++++++++++
.../clientpositive/kafka_storage_handler.q | 52 +++
.../druid/kafka_storage_handler.q.out | 226 ++++++++++-
41 files changed, 4960 insertions(+), 1816 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/itests/qtest-druid/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index e566fcf..19cdf91 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -43,7 +43,7 @@
<druid.derby.version>10.11.1.1</druid.derby.version>
<druid.guava.version>16.0.1</druid.guava.version>
<druid.guice.version>4.1.0</druid.guice.version>
- <kafka.version>1.0.1</kafka.version>
+ <kafka.version>2.0.0</kafka.version>
</properties>
<dependencies>
<dependency>
@@ -207,6 +207,11 @@
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/README.md
----------------------------------------------------------------------
diff --git a/kafka-handler/README.md b/kafka-handler/README.md
new file mode 100644
index 0000000..706c77a
--- /dev/null
+++ b/kafka-handler/README.md
@@ -0,0 +1,217 @@
+#Kafka Storage Handler Module
+
+Storage Handler that allows user to Connect/Analyse/Transform Kafka topics.
+The workflow is as follow, first the user will create an external table that is a view over one Kafka topic,
+then the user will be able to run any SQL query including write back to the same table or different kafka backed table.
+
+## Usage
+
+### Create Table
+Use following statement to create table:
+```sql
+CREATE EXTERNAL TABLE kafka_table
+(`timestamp` timestamp , `page` string, `newPage` boolean,
+ added int, deleted bigint, delta double)
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
+```
+Table property `kafka.topic` is the Kafka Topic to connect to and `kafka.bootstrap.servers` is the Broker connection string.
+Both properties are mandatory.
+On the write path if such a topic does not exists the topic will be created if Kafka broker admin policy allow such operation.
+
+By default the serializer and deserializer is Json `org.apache.hadoop.hive.serde2.JsonSerDe`.
+If you want to switch serializer/deserializer classes you can use alter table.
+```sql
+ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+```
+List of supported Serializer Deserializer:
+
+|Supported Serializer Deserializer|
+|-----|
+|org.apache.hadoop.hive.serde2.JsonSerDe|
+|org.apache.hadoop.hive.serde2.OpenCSVSerde|
+|org.apache.hadoop.hive.serde2.avro.AvroSerDe|
+|org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe|
+|org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|
+
+#### Table definition
+In addition to the user defined payload schema Kafka Storage Handler will append additional columns allowing user to query the Kafka metadata fields:
+- `__key` Kafka record key (byte array)
+- `__partition` Kafka record partition identifier (int 32)
+- `__offset` Kafka record offset (int 64)
+- `__timestamp` Kafka record timestamp (int 64)
+
+
+### Query Table
+
+List the table properties and all the partition/offsets information for the topic.
+```sql
+Describe extended kafka_table;
+```
+
+Count the number of records with Kafka record timestamp within the last 10 minutes interval.
+
+```sql
+SELECT count(*) from kafka_table
+where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
+```
+The storage handler allow filter push-down read optimization,
+for instance the query above will only read the records with timestamp satisfying the filter predicate.
+Please note that such time based seek is only viable if the Kafka broker allow time based lookup (Kafka 0.11 or later versions)
+
+In addition to **time based seek**, the storage handler reader is able to seek to a particular partition offset using the SQL WHERE clause.
+Currently only support OR/AND with (<, <=, >=, >)
+
+```sql
+SELECT count(*) from kafka_table
+where (`__offset` < 10 and `__offset`>3 and `__partition` = 0)
+or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99)
+or (`__offset` = 109);
+```
+
+User can define a view to take of the last 15 minutes and mask what ever column as follow:
+
+```sql
+CREATE VIEW last_15_minutes_of_kafka_table as select `timestamp`, `user`, delta, added from kafka_table
+where `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES);
+```
+
+Join the Kafka Stream to Hive table. For instance assume you want to join the last 15 minutes of stream to dimension table like the following.
+```sql
+CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
+```
+
+Join the view of the last 15 minutes to `user_table`, group by user gender column and compute aggregates
+over metrics from fact table and dimension table.
+
+```sql
+SELECT sum(added) as added, sum(deleted) as deleted, avg(delta) as delta, avg(age) as avg_age , gender
+FROM last_15_minutes_of_kafka_table join user_table on `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
+GROUP BY gender limit 10;
+```
+
+
+Join the Stream to the Stream it self. In cases where you want to perform some Ad-Hoc query over the last 15 minutes view.
+In the following example we show how you can perform classical user retention analysis over the Kafka Stream.
+```sql
+-- Steam join over the view it self
+-- The example is adapted from https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql
+-- Assuming l15min_wiki is a view of the last 15 minutes
+select count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
+from l15min_wiki as activity
+left join l15min_wiki as future_activity on
+ activity.`user` = future_activity.`user`
+ and activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ;
+
+-- Stream to stream join
+-- Assuming wiki_kafka_hive is the entire stream.
+select floor_hour(activity.`timestamp`), count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
+from wiki_kafka_hive as activity
+left join wiki_kafka_hive as future_activity on
+ activity.`user` = future_activity.`user`
+ and activity.`timestamp` = future_activity.`timestamp` - interval '1' hour group by floor_hour(activity.`timestamp`);
+
+```
+
+#Configuration
+
+## Table Properties
+
+| Property | Description | Mandatory | Default |
+|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
+| kafka.topic | Kafka topic name to map the table to. | Yes | null |
+| kafka.bootstrap.servers | Table property indicating Kafka broker(s) connection string. | Yes | null |
+| kafka.serde.class | Serializer and Deserializer class implementation. | No | org.apache.hadoop.hive.serde2.JsonSerDe |
+| hive.kafka.poll.timeout.ms | Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. | No | 5000 (5 Seconds) |
+| hive.kafka.max.retries | Number of retries for Kafka metadata fetch operations. | No | 6 |
+| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata. | No | 30000 (30 Seconds) |
+| kafka.write.semantic | Writer semantic, allowed values (BEST_EFFORT, AT_LEAST_ONCE, EXACTLY_ONCE) | No | AT_LEAST_ONCE |
+| hive.kafka.optimistic.commit | Boolean value indicate the if the producer should commit during task or delegate the commit to HS2. | No | true |
+
+### Setting Extra Consumer/Producer properties.
+User can inject custom Kafka consumer/producer properties via the Table properties.
+To do so user can add any key/value pair of Kafka config to the Hive table property
+by prefixing the key with `kafka.consumer` for consumer configs and `kafka.producer` for producer configs.
+For instance the following alter table query adds the table property `"kafka.consumer.max.poll.records" = "5000"`
+and will inject `max.poll.records=5000` to the Kafka Consumer.
+```sql
+ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.consumer.max.poll.records"="5000");
+```
+
+#Kafka to Hive ETL PIPE LINE
+
+load form Kafka every Record exactly once
+Goal is to read data and commit both data and its offsets in a single Transaction
+
+First create the offset table.
+```sql
+Drop table kafka_table_offsets;
+create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);
+```
+
+Initialize the table
+```sql
+insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP
+from wiki_kafka_hive group by `__partition`, CURRENT_TIMESTAMP ;
+```
+Create the end target table on the Hive warehouse.
+```sql
+Drop table orc_kafka_table;
+Create table orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint,
+ `timestamp` timestamp , `page` string, `user` string, `diffurl` string,
+ `isrobot` boolean, added int, deleted int, delta bigint
+) stored as ORC;
+```
+This an example tp insert up to offset = 2 only
+
+```sql
+From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
+on (ktable.`__partition` = offset_table.partition_id
+and ktable.`__offset` > offset_table.max_offset and ktable.`__offset` < 3 )
+insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
+`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
+Insert overwrite table kafka_table_offsets select
+`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
+```
+
+Double check the insert
+```sql
+select max(`koffset`) from orc_kafka_table limit 10;
+select count(*) as c from orc_kafka_table group by partition_id, koffset having c > 1;
+```
+
+Repeat this periodically to insert all data.
+
+```sql
+From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
+on (ktable.`__partition` = offset_table.partition_id
+and ktable.`__offset` > offset_table.max_offset )
+insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
+`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
+Insert overwrite table kafka_table_offsets select
+`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
+```
+
+#ETL from Hive to Kafka
+
+##INSERT INTO
+First create the table in have that will be the target table. Now all the inserts will go to the topic mapped by this Table.
+
+```sql
+CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive
+(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "moving_avg_wiki_kafka_hive_2",
+"kafka.bootstrap.servers"="cn105-10.l42scl.hortonworks.com:9092",
+-- STORE AS AVRO IN KAFKA
+"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+```
+
+Then insert data into the table. Keep in mind that Kafka is an append only, thus you can not use insert overwrite.
+```sql
+insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`,
+avg(delta) over (order by `timestamp` asc rows between 60 preceding and current row) as avg_delta,
+null as `__key`, null as `__partition`, -1, -1,-1, -1 from l15min_wiki;
+```
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/pom.xml
----------------------------------------------------------------------
diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index 6c58bf1..f907e9d 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -30,7 +30,7 @@
<properties>
<hive.path.to.root>..</hive.path.to.root>
- <kafka.version>1.0.1</kafka.version>
+ <kafka.version>2.0.0</kafka.version>
</properties>
<artifactId>kafka-handler</artifactId>
@@ -38,12 +38,18 @@
<name>Hive Kafka Storage Handler</name>
<dependencies>
- <!-- intra-project -->
+ <!-- Intra-project -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<scope>provided</scope>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -52,10 +58,22 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
@@ -90,6 +108,12 @@
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
deleted file mode 100644
index a0c79b3..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.JsonSerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.rmi.server.UID;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Generic Kafka Serde that allow user to delegate Serde to other class like Avro,
- * Json or any class that supports {@link BytesWritable}.
- */
-public class GenericKafkaSerDe extends AbstractSerDe {
- private static final Logger LOG = LoggerFactory.getLogger(GenericKafkaSerDe.class);
-
- private AbstractSerDe delegateSerDe;
- private ObjectInspector objectInspector;
- private final List<String> columnNames = Lists.newArrayList();
- private StructObjectInspector delegateObjectInspector;
- private final UID uid = new UID();
- @SuppressWarnings("Guava") private Supplier<DatumReader<GenericRecord>> gdrSupplier;
-
- @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException {
- final String className = tbl.getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName());
- delegateSerDe = KafkaStreamingUtils.createDelegate(className);
- //noinspection deprecation
- delegateSerDe.initialize(conf, tbl);
- LOG.debug("Using SerDe instance {}", delegateSerDe.getClass().getCanonicalName());
-
- if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) {
- throw new SerDeException("Was expecting StructObject Inspector but have " + delegateSerDe.getObjectInspector()
- .getClass()
- .getName());
- }
- delegateObjectInspector = (StructObjectInspector) delegateSerDe.getObjectInspector();
-
- // Build column names Order matters here
- columnNames.addAll(delegateObjectInspector.getAllStructFieldRefs()
- .stream()
- .map(StructField::getFieldName)
- .collect(Collectors.toList()));
- columnNames.addAll(KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES);
-
- final List<ObjectInspector> inspectors = new ArrayList<>(columnNames.size());
- inspectors.addAll(delegateObjectInspector.getAllStructFieldRefs()
- .stream()
- .map(StructField::getFieldObjectInspector)
- .collect(Collectors.toList()));
- inspectors.addAll(KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS);
- objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
-
- // lazy supplier to read Avro Records if needed
- gdrSupplier = getReaderSupplier(tbl);
- }
-
- @Override public Class<? extends Writable> getSerializedClass() {
- return delegateSerDe.getSerializedClass();
- }
-
- @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
- return delegateSerDe.serialize(obj, objInspector);
- }
-
- @Override public SerDeStats getSerDeStats() {
- return delegateSerDe.getSerDeStats();
- }
-
- @Override public Object deserialize(Writable blob) throws SerDeException {
- KafkaRecordWritable record = (KafkaRecordWritable) blob;
- // switch case the serde nature
- final Object row;
- if (delegateSerDe instanceof JsonSerDe) {
- //@TODO Text constructor copies the data, this op is not needed
- row = delegateSerDe.deserialize(new Text(record.getValue()));
- } else if (delegateSerDe instanceof AvroSerDe) {
- AvroGenericRecordWritable avroGenericRecordWritable = new AvroGenericRecordWritable();
- GenericRecord avroRecord;
- try {
- avroRecord = gdrSupplier.get().read(null, DecoderFactory.get().binaryDecoder(record.getValue(), null));
- avroGenericRecordWritable.setRecord(avroRecord);
- avroGenericRecordWritable.setRecordReaderID(uid);
- avroGenericRecordWritable.setFileSchema(avroRecord.getSchema());
- } catch (IOException e) {
- throw new SerDeException(e);
- }
- row = delegateSerDe.deserialize(avroGenericRecordWritable);
- } else {
- // default assuming delegate Serde know how to deal with
- row = delegateSerDe.deserialize(new BytesWritable(record.getValue()));
- }
-
- return columnNames.stream().map(name -> {
- Function<KafkaRecordWritable, Writable> metaColumnMapper = KafkaStreamingUtils.recordWritableFnMap.get(name);
- if (metaColumnMapper != null) {
- return metaColumnMapper.apply(record);
- }
- return delegateObjectInspector.getStructFieldData(row, delegateObjectInspector.getStructFieldRef(name));
- }).collect(Collectors.toList());
- }
-
- @Override public ObjectInspector getObjectInspector() {
- return objectInspector;
- }
-
- @SuppressWarnings("Guava") private Supplier<DatumReader<GenericRecord>> getReaderSupplier(Properties tbl) {
- return Suppliers.memoize(() -> {
- String schemaFromProperty = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), "");
- Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
- Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
- LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
- return new SpecificDatumReader<>(schema);
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
new file mode 100644
index 0000000..2270e08
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
+ * This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
+ * Class is mostly taken from Apache Flink Project:
+ * org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer
+ *
+ * @param <K> key serializer class.
+ * @param <V> value serializer class.
+ */
+class HiveKafkaProducer<K, V> implements Producer<K, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducer.class);
+
+ private final KafkaProducer<K, V> kafkaProducer;
+
+ @Nullable private final String transactionalId;
+
+ HiveKafkaProducer(Properties properties) {
+ transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+ @Override public void initTransactions() {
+ kafkaProducer.initTransactions();
+ }
+
+ @Override public void beginTransaction() throws ProducerFencedException {
+ kafkaProducer.beginTransaction();
+ }
+
+ @Override public void commitTransaction() throws ProducerFencedException {
+ kafkaProducer.commitTransaction();
+ }
+
+ @Override public void abortTransaction() throws ProducerFencedException {
+ kafkaProducer.abortTransaction();
+ }
+
+ @Override public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
+ throws ProducerFencedException {
+ kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+ }
+
+ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+ return kafkaProducer.send(record);
+ }
+
+ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+ return kafkaProducer.send(record, callback);
+ }
+
+ @Override public List<PartitionInfo> partitionsFor(String topic) {
+ return kafkaProducer.partitionsFor(topic);
+ }
+
+ @Override public Map<MetricName, ? extends Metric> metrics() {
+ return kafkaProducer.metrics();
+ }
+
+ @Override public void close() {
+ kafkaProducer.close();
+ }
+
+ @Override public void close(long timeout, TimeUnit unit) {
+ kafkaProducer.close(timeout, unit);
+ }
+
+ @Override public void flush() {
+ kafkaProducer.flush();
+ if (transactionalId != null) {
+ flushNewPartitions();
+ }
+ }
+
+ /**
+ * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
+ * so that we can resume transaction after a restart. Implementation of this method is based on
+ * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
+ */
+ synchronized void resumeTransaction(long producerId, short epoch) {
+ Preconditions.checkState(producerId >= 0 && epoch >= 0,
+ "Incorrect values for producerId {} and epoch {}",
+ producerId,
+ epoch);
+ LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
+
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+
+ Object nextSequence = getValue(transactionManager, "nextSequence");
+ Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence");
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+ invoke(nextSequence, "clear");
+ invoke(lastAckedSequence, "clear");
+
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ setValue(producerIdAndEpoch, "producerId", producerId);
+ setValue(producerIdAndEpoch, "epoch", epoch);
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+ invoke(transactionManager,
+ "transitionTo",
+ getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+ setValue(transactionManager, "transactionStarted", true);
+ }
+
+ @Nullable String getTransactionalId() {
+ return transactionalId;
+ }
+
+ long getProducerId() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ return (long) getValue(producerIdAndEpoch, "producerId");
+ }
+
+ short getEpoch() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+ return (short) getValue(producerIdAndEpoch, "epoch");
+ }
+
+ /**
+ * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
+ * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
+ * resumeTransaction simpler.
+ * Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight" partitions.
+ */
+ private void flushNewPartitions() {
+ LOG.info("Flushing new partitions");
+ TransactionalRequestResult result = enqueueNewPartitions();
+ Object sender = getValue(kafkaProducer, "sender");
+ invoke(sender, "wakeup");
+ result.await();
+ }
+
+ private synchronized TransactionalRequestResult enqueueNewPartitions() {
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+ invoke(transactionManager,
+ "enqueueRequest",
+ new Class[] {txnRequestHandler.getClass().getSuperclass()},
+ new Object[] {txnRequestHandler});
+ return (TransactionalRequestResult) getValue(txnRequestHandler,
+ txnRequestHandler.getClass().getSuperclass(),
+ "result");
+ }
+
+ @SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) {
+ @SuppressWarnings("RegExpRedundantEscape") String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
+ if (x.length == 2) {
+ String enumClassName = x[0];
+ String enumName = x[1];
+ try {
+ Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
+ return Enum.valueOf(cl, enumName);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+ return null;
+ }
+
+ private static Object invoke(Object object, String methodName, Object... args) {
+ Class<?>[] argTypes = new Class[args.length];
+ for (int i = 0; i < args.length; i++) {
+ argTypes[i] = args[i].getClass();
+ }
+ return invoke(object, methodName, argTypes, args);
+ }
+
+ private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
+ try {
+ Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
+ method.setAccessible(true);
+ return method.invoke(object, args);
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+
+ private static Object getValue(Object object, String fieldName) {
+ return getValue(object, object.getClass(), fieldName);
+ }
+
+ private static Object getValue(Object object, Class<?> clazz, String fieldName) {
+ try {
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(object);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+
+ private static void setValue(Object object, String fieldName, Object value) {
+ try {
+ Field field = object.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(object, value);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException("Incompatible KafkaProducer version", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
new file mode 100644
index 0000000..c401df9
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka puller input format to read records from a Kafka Queue.
+ * The input split will contain the set of topic partition and start/end offsets.
+ * Records will be returned as bytes array.
+ */
+@SuppressWarnings("WeakerAccess") public class KafkaInputFormat extends InputFormat<NullWritable, KafkaWritable>
+ implements org.apache.hadoop.mapred.InputFormat<NullWritable, KafkaWritable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaInputFormat.class);
+
+ @Override public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
+ List<KafkaInputSplit> inputSplits;
+ try {
+ inputSplits = computeSplits(jobConf);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ InputSplit[] inputSplitsArray = new InputSplit[inputSplits.size()];
+ return inputSplits.toArray(inputSplitsArray);
+ }
+
+ /**
+ * Build a full scan using Kafka list partition then beginning/end offsets.
+ * This function might block duo to calls like:
+ * org.apache.kafka.clients.consumer.KafkaConsumer#beginningOffsets(java.util.Collection)
+ *
+ * @param topic kafka topic
+ * @param consumer initialized kafka consumer
+ * @param tablePaths hive table path
+ *
+ * @return full scan input split collection based on Kafka metadata APIs
+ */
+ private static List<KafkaInputSplit> buildFullScanFromKafka(String topic,
+ KafkaConsumer<byte[], byte[]> consumer,
+ Path[] tablePaths, int maxTries) {
+ final Map<TopicPartition, Long> starOffsetsMap;
+ final Map<TopicPartition, Long> endOffsetsMap;
+
+ final List<TopicPartition> topicPartitions;
+ RetryUtils.Task<List<TopicPartition>> fetchTPTask = () -> fetchTopicPartitions(topic, consumer);
+ try {
+ topicPartitions = RetryUtils.retry(fetchTPTask, (error) -> !KafkaUtils.exceptionIsFatal(error), maxTries);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ starOffsetsMap = consumer.beginningOffsets(topicPartitions);
+ endOffsetsMap = consumer.endOffsets(topicPartitions);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Found the following partitions [{}]",
+ topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")));
+ starOffsetsMap.forEach((tp, start) -> LOG.info("TPartition [{}],Start offsets [{}]", tp, start));
+ endOffsetsMap.forEach((tp, end) -> LOG.info("TPartition [{}],End offsets [{}]", tp, end));
+ }
+ return topicPartitions.stream()
+ .map(topicPartition -> new KafkaInputSplit(topicPartition.topic(),
+ topicPartition.partition(),
+ starOffsetsMap.get(topicPartition),
+ endOffsetsMap.get(topicPartition),
+ tablePaths[0]))
+ .collect(Collectors.toList());
+ }
+
+ private List<KafkaInputSplit> computeSplits(Configuration configuration)
+ throws IOException, InterruptedException {
+ // ExecutorService is used to harness some KAFKA blocking calls and interrupt after some duration
+ final ExecutorService execService = Executors.newSingleThreadExecutor();
+
+ try (KafkaConsumer consumer = new KafkaConsumer(KafkaUtils.consumerProperties(configuration))) {
+ final String topic = configuration.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
+ final long timeoutMs = configuration.getLong(KafkaTableProperties.KAFKA_FETCH_METADATA_TIMEOUT.getName(), -1);
+ final int maxTries = configuration.getInt(KafkaTableProperties.MAX_RETRIES.getName(), -1);
+ // hive depends on FileSplits
+ JobConf jobConf = new JobConf(configuration);
+ Path[] tablePaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf);
+
+ final Future<List<KafkaInputSplit>> futureFullHouse;
+ //noinspection unchecked
+ futureFullHouse = execService.submit(() -> buildFullScanFromKafka(topic, consumer, tablePaths, maxTries));
+ final List<KafkaInputSplit> fullHouse;
+ try {
+ fullHouse = futureFullHouse.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException | ExecutionException e) {
+ futureFullHouse.cancel(true);
+ LOG.error("can not generate full scan split", e);
+ // at this point we can not go further fail split generation
+ throw new IOException(e);
+ }
+
+ @SuppressWarnings("unchecked") final ImmutableMap.Builder<TopicPartition, KafkaInputSplit>
+ fullHouseMapBuilder =
+ new ImmutableMap.Builder();
+ fullHouse.forEach(input -> fullHouseMapBuilder.put(new TopicPartition(input.getTopic(), input.getPartition()),
+ input));
+
+ final KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouseMapBuilder.build(), consumer);
+ final String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+
+ if (filterExprSerialized != null && !filterExprSerialized.isEmpty()) {
+ ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
+ LOG.info("Kafka trimmer working on Filter tree {}", filterExpr.getExprString());
+ Callable<List<KafkaInputSplit>>
+ trimmerWorker = () -> kafkaScanTrimmer.computeOptimizedScan(filterExpr)
+ .entrySet()
+ .stream()
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+
+ Future<List<KafkaInputSplit>> futureTinyHouse = execService.submit(trimmerWorker);
+ try {
+ return futureTinyHouse.get(timeoutMs, TimeUnit.MILLISECONDS)
+ .stream()
+ // filter out empty splits
+ .filter(split -> split.getStartOffset() < split.getEndOffset())
+ .collect(Collectors.toList());
+ } catch (ExecutionException | TimeoutException e) {
+ futureTinyHouse.cancel(true);
+ LOG.error("Had issue with trimmer will return full scan ", e);
+ return fullHouse;
+ }
+ }
+ //Case null: it can be filter evaluated to false or no filter at all thus return full scan
+ return fullHouse;
+ } finally {
+ execService.shutdown();
+ }
+ }
+
+ private static List<TopicPartition> fetchTopicPartitions(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+ // this will block till REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms"
+ // then throws org.apache.kafka.common.errors.TimeoutException if can not fetch metadata
+ // @TODO add retry logic maybe
+ List<PartitionInfo> partitions = consumer.partitionsFor(topic);
+ return partitions.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList());
+ }
+
+ @Override public RecordReader<NullWritable, KafkaWritable> getRecordReader(InputSplit inputSplit,
+ JobConf jobConf,
+ Reporter reporter) {
+ return new KafkaRecordReader((KafkaInputSplit) inputSplit, jobConf);
+ }
+
+ @Override public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext jobContext)
+ throws IOException, InterruptedException {
+ return computeSplits(jobContext.getConfiguration()).stream()
+ .map(kafkaPullerInputSplit -> (org.apache.hadoop.mapreduce.InputSplit) kafkaPullerInputSplit)
+ .collect(Collectors.toList());
+ }
+
+ @Override public org.apache.hadoop.mapreduce.RecordReader<NullWritable, KafkaWritable> createRecordReader(
+ org.apache.hadoop.mapreduce.InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) {
+ return new KafkaRecordReader();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
new file mode 100644
index 0000000..cb1f4df
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+import javax.annotation.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Kafka Hadoop Input Split Class.
+ */
+@SuppressWarnings("WeakerAccess") public class KafkaInputSplit extends FileSplit
+ implements org.apache.hadoop.mapred.InputSplit {
+ private String topic;
+ private long startOffset;
+ private int partition;
+ private long endOffset;
+
+ public KafkaInputSplit() {
+ super(null, 0, 0, (String[]) null);
+ }
+
+ public KafkaInputSplit(String topic, int partition, long startOffset, long endOffset, Path dummyPath) {
+ super(dummyPath, 0, 0, (String[]) null);
+ this.topic = topic;
+ this.startOffset = startOffset;
+ this.partition = partition;
+ this.endOffset = endOffset;
+ Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
+ "start [%s] has to be positive and >= end [%]",
+ startOffset,
+ endOffset);
+ }
+
+ @Override public long getLength() {
+ return 0;
+ }
+
+ @Override public String[] getLocations() {
+ return new String[0];
+ }
+
+ @Override public void write(DataOutput dataOutput) throws IOException {
+ super.write(dataOutput);
+ dataOutput.writeUTF(topic);
+ dataOutput.writeInt(partition);
+ dataOutput.writeLong(startOffset);
+ dataOutput.writeLong(endOffset);
+ }
+
+ @Override public void readFields(DataInput dataInput) throws IOException {
+ super.readFields(dataInput);
+ topic = dataInput.readUTF();
+ partition = dataInput.readInt();
+ startOffset = dataInput.readLong();
+ endOffset = dataInput.readLong();
+ Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
+ "start [%s] has to be positive and >= end [%]",
+ startOffset,
+ endOffset);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ /**
+ * Compute the intersection of 2 splits. Splits must share the same topic and partition number.
+ *
+ * @param split1 left split
+ * @param split2 right split
+ *
+ * @return new split that represents range intersection or null if it is not overlapping
+ */
+ @Nullable public static KafkaInputSplit intersectRange(KafkaInputSplit split1,
+ KafkaInputSplit split2) {
+ assert (split1.topic.equals(split2.topic));
+ assert (split1.partition == split2.partition);
+ final long startOffset = Math.max(split1.getStartOffset(), split2.getStartOffset());
+ final long endOffset = Math.min(split1.getEndOffset(), split2.getEndOffset());
+ if (startOffset > endOffset) {
+ // there is no overlapping
+ return null;
+ }
+ return new KafkaInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
+ }
+
+ /**
+ * Compute union of ranges between splits. Splits must share the same topic and partition
+ *
+ * @param split1 left split
+ * @param split2 right split
+ *
+ * @return new split with a range including both splits.
+ */
+ public static KafkaInputSplit unionRange(KafkaInputSplit split1, KafkaInputSplit split2) {
+ assert (split1.topic.equals(split2.topic));
+ assert (split1.partition == split2.partition);
+ final long startOffset = Math.min(split1.getStartOffset(), split2.getStartOffset());
+ final long endOffset = Math.max(split1.getEndOffset(), split2.getEndOffset());
+ return new KafkaInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof KafkaInputSplit)) {
+ return false;
+ }
+ KafkaInputSplit that = (KafkaInputSplit) o;
+ return Objects.equal(getTopic(), that.getTopic())
+ && Objects.equal(getStartOffset(), that.getStartOffset())
+ && Objects.equal(getPartition(), that.getPartition())
+ && Objects.equal(getEndOffset(), that.getEndOffset());
+ }
+
+ @Override public int hashCode() {
+ return Objects.hashCode(getTopic(), getStartOffset(), getPartition(), getEndOffset());
+ }
+
+ @Override public String toString() {
+ return "KafkaInputSplit{"
+ + "topic='"
+ + topic
+ + '\''
+ + ", startOffset="
+ + startOffset
+ + ", partition="
+ + partition
+ + ", endOffset="
+ + endOffset
+ + ", path="
+ + super.getPath().toString()
+ + '}';
+ }
+
+ public static KafkaInputSplit copyOf(KafkaInputSplit other) {
+ return new KafkaInputSplit(other.getTopic(),
+ other.getPartition(),
+ other.getStartOffset(),
+ other.getEndOffset(),
+ other.getPath());
+ }
+
+ public static List<KafkaInputSplit> slice(long sliceSize, final KafkaInputSplit split) {
+ if (split.getEndOffset() - split.getStartOffset() > sliceSize) {
+ ImmutableList.Builder<KafkaInputSplit> builder = ImmutableList.builder();
+ long start = split.getStartOffset();
+ while (start < split.getEndOffset() - sliceSize) {
+ builder.add(new KafkaInputSplit(split.topic,
+ split.partition,
+ start,
+ start + sliceSize + 1,
+ split.getPath()));
+ start += sliceSize + 1;
+ }
+ // last split
+ if (start < split.getEndOffset()) {
+ builder.add(new KafkaInputSplit(split.topic,
+ split.partition,
+ start,
+ split.getEndOffset(),
+ split.getPath()));
+ }
+ return builder.build();
+ }
+
+ return Collections.singletonList(copyOf(split));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
index 5f0143d..228225c 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
@@ -78,15 +78,14 @@ import java.util.stream.Collectors;
* Basic JsonSerDe to make use of such storage handler smooth and easy and testing basic primitive Json.
* For production please use Hive native JsonSerde.
*/
-public class KafkaJsonSerDe extends AbstractSerDe {
+@SuppressWarnings("unused") class KafkaJsonSerDe extends AbstractSerDe {
private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonSerDe.class);
private static final ThreadLocal<DateTimeFormatter>
TS_PARSER =
ThreadLocal.withInitial(KafkaJsonSerDe::createAutoParser);
- private static final Function<TypeInfo, ObjectInspector>
- typeInfoToObjectInspector =
- typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
- typeInfo.getTypeName()));
+ private static final Function<TypeInfo, ObjectInspector> TYPEINFO_TO_OI =
+ typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()));
private List<String> columnNames;
private List<TypeInfo> columnTypes;
private ObjectInspector inspector;
@@ -118,7 +117,7 @@ public class KafkaJsonSerDe extends AbstractSerDe {
LOG.debug("types: {}, {} ", columnTypeProperty, columnTypes);
}
- inspectors = columnTypes.stream().map(typeInfoToObjectInspector).collect(Collectors.toList());
+ inspectors = columnTypes.stream().map(TYPEINFO_TO_OI).collect(Collectors.toList());
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
}
@@ -236,8 +235,8 @@ public class KafkaJsonSerDe extends AbstractSerDe {
DateTimeParser
timeOrOffset =
new DateTimeFormatterBuilder().append(null,
- new DateTimeParser[] { new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
- new DateTimeFormatterBuilder().appendLiteral(' ').toParser() })
+ new DateTimeParser[] {new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
+ new DateTimeFormatterBuilder().appendLiteral(' ').toParser()})
.appendOptional(ISODateTimeFormat.timeElementParser().getParser())
.appendOptional(offsetElement.getParser())
.toParser();
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
new file mode 100644
index 0000000..950f731
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Kafka Hive Output Format class used to write Hive Rows to a Kafka Queue.
+ */
+public class KafkaOutputFormat implements HiveOutputFormat<NullWritable, KafkaWritable> {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaOutputFormat.class);
+
+ @Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc,
+ Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress) {
+ final String topic = jc.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
+ final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), true);
+ final WriteSemantic
+ writeSemantic =
+ WriteSemantic.valueOf(jc.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName()));
+ final Properties producerProperties = KafkaUtils.producerProperties(jc);
+ final FileSinkOperator.RecordWriter recordWriter;
+ switch (writeSemantic) {
+ case BEST_EFFORT:
+ recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), false, producerProperties);
+ break;
+ case AT_LEAST_ONCE:
+ recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), true, producerProperties);
+ break;
+ case EXACTLY_ONCE:
+ FileSystem fs;
+ try {
+ fs = finalOutPath.getFileSystem(jc);
+ } catch (IOException e) {
+ LOG.error("Can not construct file system instance", e);
+ throw new RuntimeException(e);
+ }
+ final String queryId = Preconditions.checkNotNull(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname, null));
+ recordWriter =
+ new TransactionalKafkaWriter(topic, producerProperties,
+ new Path(Preconditions.checkNotNull(finalOutPath), queryId),
+ fs,
+ optimisticCommit);
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown delivery semantic [%s]", writeSemantic.toString()));
+ }
+ return recordWriter;
+ }
+
+ @Override public RecordWriter<NullWritable, KafkaWritable> getRecordWriter(FileSystem fileSystem,
+ JobConf jobConf,
+ String s,
+ Progressable progressable) {
+ throw new RuntimeException("this is not suppose to be here");
+ }
+
+ @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) {
+
+ }
+
+ /**
+ * Possible write semantic supported by the Record Writer.
+ */
+ enum WriteSemantic {
+ /**
+ * Best effort delivery with no guarantees at all, user can set Producer properties as they wish,
+ * will carry on when possible unless it is a fatal exception.
+ */
+ BEST_EFFORT,
+ /**
+ * Deliver all the record at least once unless the job fails.
+ * Therefore duplicates can be introduced due to lost ACKs or Tasks retries.
+ * Currently this is the default.
+ */
+ AT_LEAST_ONCE,
+ /**
+ * Deliver every record exactly once.
+ */
+ EXACTLY_ONCE,
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
deleted file mode 100644
index 2d5637d..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-/**
- * Kafka puller input format to read records from a Kafka Queue.
- * The input split will contain the set of topic partition and start/end offsets.
- * Records will be returned as bytes array.
- */
-public class KafkaPullerInputFormat extends InputFormat<NullWritable, KafkaRecordWritable>
- implements org.apache.hadoop.mapred.InputFormat<NullWritable, KafkaRecordWritable> {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerInputFormat.class);
-
- @Override public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
- List<KafkaPullerInputSplit> inputSplits;
- try {
- inputSplits = computeSplits(jobConf);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
- InputSplit[] inputSplitsArray = new InputSplit[inputSplits.size()];
- return inputSplits.toArray(inputSplitsArray);
- }
-
- /**
- * Build a full scan using Kafka list partition then beginning/end offsets.
- * This function might block duo to calls like:
- * org.apache.kafka.clients.consumer.KafkaConsumer#beginningOffsets(java.util.Collection)
- *
- * @param topic kafka topic
- * @param consumer initialized kafka consumer
- * @param tablePaths hive table path
- *
- * @return full scan input split collection based on Kafka metadata APIs
- */
- private static List<KafkaPullerInputSplit> buildFullScanFromKafka(String topic,
- KafkaConsumer<byte[], byte[]> consumer,
- Path[] tablePaths) {
- final Map<TopicPartition, Long> starOffsetsMap;
- final Map<TopicPartition, Long> endOffsetsMap;
-
- final List<TopicPartition> topicPartitions;
- topicPartitions = fetchTopicPartitions(topic, consumer);
- starOffsetsMap = consumer.beginningOffsets(topicPartitions);
- endOffsetsMap = consumer.endOffsets(topicPartitions);
-
- if (LOG.isDebugEnabled()) {
- LOG.info("Found the following partitions [{}]",
- topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")));
- starOffsetsMap.forEach((tp, start) -> LOG.info("TPartition [{}],Start offsets [{}]", tp, start));
- endOffsetsMap.forEach((tp, end) -> LOG.info("TPartition [{}],End offsets [{}]", tp, end));
- }
- return topicPartitions.stream()
- .map(topicPartition -> new KafkaPullerInputSplit(topicPartition.topic(),
- topicPartition.partition(),
- starOffsetsMap.get(topicPartition),
- endOffsetsMap.get(topicPartition),
- tablePaths[0]))
- .collect(Collectors.toList());
- }
-
- private List<KafkaPullerInputSplit> computeSplits(Configuration configuration)
- throws IOException, InterruptedException {
- // this will be used to harness some KAFKA blocking calls
- final ExecutorService execService = Executors.newSingleThreadExecutor();
- try (KafkaConsumer consumer = new KafkaConsumer(KafkaStreamingUtils.consumerProperties(configuration))) {
- final String topic = configuration.get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
- final long
- timeoutMs =
- configuration.getLong(KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT,
- KafkaStreamingUtils.DEFAULT_CONSUMER_POLL_TIMEOUT_MS);
- // hive depends on FileSplits
- JobConf jobConf = new JobConf(configuration);
- Path[] tablePaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf);
-
- //noinspection unchecked
- Future<List<KafkaPullerInputSplit>>
- futureFullHouse =
- execService.submit(() -> buildFullScanFromKafka(topic, consumer, tablePaths));
- List<KafkaPullerInputSplit> fullHouse;
- try {
- fullHouse = futureFullHouse.get(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (TimeoutException | ExecutionException e) {
- futureFullHouse.cancel(true);
- LOG.error("can not generate full scan split", e);
- // at this point we can not go further fail split generation
- throw new IOException(e);
- }
-
- @SuppressWarnings("unchecked") final ImmutableMap.Builder<TopicPartition, KafkaPullerInputSplit>
- fullHouseMapBuilder =
- new ImmutableMap.Builder();
- fullHouse.forEach(input -> fullHouseMapBuilder.put(new TopicPartition(input.getTopic(), input.getPartition()),
- input));
-
- final KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouseMapBuilder.build(), consumer);
- final String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-
- if (filterExprSerialized != null && !filterExprSerialized.isEmpty()) {
- ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
- LOG.info("Kafka trimmer working on Filter tree {}", filterExpr.getExprString());
- Callable<List<KafkaPullerInputSplit>>
- trimmerWorker = () -> kafkaScanTrimmer.computeOptimizedScan(filterExpr)
- .entrySet()
- .stream()
- .map(Map.Entry::getValue)
- .collect(Collectors.toList());
-
- Future<List<KafkaPullerInputSplit>> futureTinyHouse = execService.submit(trimmerWorker);
- try {
- return futureTinyHouse.get(timeoutMs, TimeUnit.MILLISECONDS)
- .stream()
- // filter out empty splits
- .filter(split -> split.getStartOffset() < split.getEndOffset())
- .collect(Collectors.toList());
- } catch (ExecutionException | TimeoutException e) {
- futureTinyHouse.cancel(true);
- LOG.error("Had issue with trimmer will return full scan ", e);
- return fullHouse;
- }
- }
- //Case null: it can be filter evaluated to false or no filter at all thus return full scan
- return fullHouse;
- } finally {
- execService.shutdown();
- }
- }
-
- private static List<TopicPartition> fetchTopicPartitions(String topic, KafkaConsumer<byte[], byte[]> consumer) {
- // this will block till REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms"
- // then throws org.apache.kafka.common.errors.TimeoutException if can not fetch metadata
- // @TODO add retry logic maybe
- List<PartitionInfo> partitions = consumer.partitionsFor(topic);
- return partitions.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList());
- }
-
- @Override public RecordReader<NullWritable, KafkaRecordWritable> getRecordReader(InputSplit inputSplit,
- JobConf jobConf,
- Reporter reporter) {
- return new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplit, jobConf);
- }
-
- @Override public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext jobContext)
- throws IOException, InterruptedException {
- return computeSplits(jobContext.getConfiguration()).stream()
- .map(kafkaPullerInputSplit -> (org.apache.hadoop.mapreduce.InputSplit) kafkaPullerInputSplit)
- .collect(Collectors.toList());
- }
-
- @Override public org.apache.hadoop.mapreduce.RecordReader<NullWritable, KafkaRecordWritable> createRecordReader(
- org.apache.hadoop.mapreduce.InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext) {
- return new KafkaPullerRecordReader();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
deleted file mode 100644
index 697469c..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-import javax.annotation.Nullable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Kafka Hadoop Input Split Class.
- */
-@SuppressWarnings("WeakerAccess") public class KafkaPullerInputSplit extends FileSplit
- implements org.apache.hadoop.mapred.InputSplit {
- private String topic;
- private long startOffset;
- private int partition;
- private long endOffset;
-
- public KafkaPullerInputSplit() {
- super(null, 0, 0, (String[]) null);
- }
-
- public KafkaPullerInputSplit(String topic, int partition, long startOffset, long endOffset, Path dummyPath) {
- super(dummyPath, 0, 0, (String[]) null);
- this.topic = topic;
- this.startOffset = startOffset;
- this.partition = partition;
- this.endOffset = endOffset;
- Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
- "start [%s] has to be positive and >= end [%]",
- startOffset,
- endOffset);
- }
-
- @Override public long getLength() {
- return 0;
- }
-
- @Override public String[] getLocations() {
- return new String[0];
- }
-
- @Override public void write(DataOutput dataOutput) throws IOException {
- super.write(dataOutput);
- dataOutput.writeUTF(topic);
- dataOutput.writeInt(partition);
- dataOutput.writeLong(startOffset);
- dataOutput.writeLong(endOffset);
- }
-
- @Override public void readFields(DataInput dataInput) throws IOException {
- super.readFields(dataInput);
- topic = dataInput.readUTF();
- partition = dataInput.readInt();
- startOffset = dataInput.readLong();
- endOffset = dataInput.readLong();
- Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
- "start [%s] has to be positive and >= end [%]",
- startOffset,
- endOffset);
- }
-
- public String getTopic() {
- return topic;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public long getStartOffset() {
- return startOffset;
- }
-
- public long getEndOffset() {
- return endOffset;
- }
-
- /**
- * Compute the intersection of 2 splits. Splits must share the same topic and partition number.
- *
- * @param split1 left split
- * @param split2 right split
- *
- * @return new split that represents range intersection or null if it is not overlapping
- */
- @Nullable public static KafkaPullerInputSplit intersectRange(KafkaPullerInputSplit split1,
- KafkaPullerInputSplit split2) {
- assert (split1.topic.equals(split2.topic));
- assert (split1.partition == split2.partition);
- final long startOffset = Math.max(split1.getStartOffset(), split2.getStartOffset());
- final long endOffset = Math.min(split1.getEndOffset(), split2.getEndOffset());
- if (startOffset > endOffset) {
- // there is no overlapping
- return null;
- }
- return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
- }
-
- /**
- * Compute union of ranges between splits. Splits must share the same topic and partition
- *
- * @param split1 left split
- * @param split2 right split
- *
- * @return new split with a range including both splits.
- */
- public static KafkaPullerInputSplit unionRange(KafkaPullerInputSplit split1, KafkaPullerInputSplit split2) {
- assert (split1.topic.equals(split2.topic));
- assert (split1.partition == split2.partition);
- final long startOffset = Math.min(split1.getStartOffset(), split2.getStartOffset());
- final long endOffset = Math.max(split1.getEndOffset(), split2.getEndOffset());
- return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
- }
-
- @Override public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof KafkaPullerInputSplit)) {
- return false;
- }
- KafkaPullerInputSplit that = (KafkaPullerInputSplit) o;
- return Objects.equal(getTopic(), that.getTopic())
- && Objects.equal(getStartOffset(), that.getStartOffset())
- && Objects.equal(getPartition(), that.getPartition())
- && Objects.equal(getEndOffset(), that.getEndOffset());
- }
-
- @Override public int hashCode() {
- return Objects.hashCode(getTopic(), getStartOffset(), getPartition(), getEndOffset());
- }
-
- @Override public String toString() {
- return "KafkaPullerInputSplit{"
- + "topic='"
- + topic
- + '\''
- + ", startOffset="
- + startOffset
- + ", partition="
- + partition
- + ", endOffset="
- + endOffset
- + ", path="
- + super.getPath().toString()
- + '}';
- }
-
- public static KafkaPullerInputSplit copyOf(KafkaPullerInputSplit other) {
- return new KafkaPullerInputSplit(other.getTopic(),
- other.getPartition(),
- other.getStartOffset(),
- other.getEndOffset(),
- other.getPath());
- }
-
- @SuppressWarnings("MethodDoesntCallSuperMethod") public KafkaPullerInputSplit clone() {
- return copyOf(this);
- }
-
- public static List<KafkaPullerInputSplit> slice(long sliceSize, final KafkaPullerInputSplit split) {
- if (split.getEndOffset() - split.getStartOffset() > sliceSize) {
- ImmutableList.Builder<KafkaPullerInputSplit> builder = ImmutableList.builder();
- long start = split.getStartOffset();
- while (start < split.getEndOffset() - sliceSize) {
- builder.add(new KafkaPullerInputSplit(split.topic,
- split.partition,
- start,
- start + sliceSize + 1,
- split.getPath()));
- start += sliceSize + 1;
- }
- // last split
- if (start < split.getEndOffset()) {
- builder.add(new KafkaPullerInputSplit(split.topic,
- split.partition,
- start,
- split.getEndOffset(),
- split.getPath()));
- }
- return builder.build();
- }
-
- return Collections.singletonList(copyOf(split));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
deleted file mode 100644
index 06a10b4..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Properties;
-
-/**
- * Kafka Records Reader implementation.
- */
-@SuppressWarnings("UnstableApiUsage") public class KafkaPullerRecordReader extends RecordReader<NullWritable, KafkaRecordWritable>
- implements org.apache.hadoop.mapred.RecordReader<NullWritable, KafkaRecordWritable> {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerRecordReader.class);
-
- private KafkaConsumer<byte[], byte[]> consumer = null;
- private Configuration config = null;
- private KafkaRecordWritable currentWritableValue;
- private Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor = null;
-
- private long totalNumberRecords = 0L;
- private long consumedRecords = 0L;
- private long readBytes = 0L;
- private volatile boolean started = false;
- private long startOffset = -1L;
- private long endOffset = Long.MAX_VALUE;
-
- @SuppressWarnings("WeakerAccess") public KafkaPullerRecordReader() {
- }
-
- private void initConsumer() {
- if (consumer == null) {
- LOG.info("Initializing Kafka Consumer");
- final Properties properties = KafkaStreamingUtils.consumerProperties(config);
- String brokerString = properties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- Preconditions.checkNotNull(brokerString, "broker end point can not be null");
- LOG.info("Starting Consumer with Kafka broker string [{}]", brokerString);
- consumer = new KafkaConsumer<>(properties);
- }
- }
-
- @SuppressWarnings("WeakerAccess") public KafkaPullerRecordReader(KafkaPullerInputSplit inputSplit,
- Configuration jobConf) {
- initialize(inputSplit, jobConf);
- }
-
- private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configuration jobConf) {
- if (!started) {
- this.config = jobConf;
- startOffset = inputSplit.getStartOffset();
- endOffset = inputSplit.getEndOffset();
- TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
- Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
- "Start [%s] has to be positive and less or equal than End [%s]", startOffset, endOffset);
- totalNumberRecords += endOffset - startOffset;
- initConsumer();
- long
- pollTimeout =
- config.getLong(KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT,
- KafkaStreamingUtils.DEFAULT_CONSUMER_POLL_TIMEOUT_MS);
- LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
- this.recordsCursor =
- startOffset == endOffset ?
- new EmptyIterator() :
- new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout);
- started = true;
- }
- }
-
- @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit,
- TaskAttemptContext context) {
- initialize((KafkaPullerInputSplit) inputSplit, context.getConfiguration());
- }
-
- @Override public boolean next(NullWritable nullWritable, KafkaRecordWritable bytesWritable) {
- if (started && recordsCursor.hasNext()) {
- ConsumerRecord<byte[], byte[]> record = recordsCursor.next();
- bytesWritable.set(record, startOffset, endOffset);
- consumedRecords += 1;
- readBytes += record.serializedValueSize();
- return true;
- }
- return false;
- }
-
- @Override public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override public KafkaRecordWritable createValue() {
- return new KafkaRecordWritable();
- }
-
- @Override public long getPos() {
- return -1;
- }
-
- @Override public boolean nextKeyValue() {
- currentWritableValue = new KafkaRecordWritable();
- if (next(NullWritable.get(), currentWritableValue)) {
- return true;
- }
- currentWritableValue = null;
- return false;
- }
-
- @Override public NullWritable getCurrentKey() {
- return NullWritable.get();
- }
-
- @Override public KafkaRecordWritable getCurrentValue() {
- return Preconditions.checkNotNull(currentWritableValue);
- }
-
- @Override public float getProgress() {
- if (consumedRecords == 0) {
- return 0f;
- }
- if (consumedRecords >= totalNumberRecords) {
- return 1f;
- }
- return consumedRecords * 1.0f / totalNumberRecords;
- }
-
- @Override public void close() {
- LOG.trace("total read bytes [{}]", readBytes);
- if (consumer != null) {
- consumer.wakeup();
- consumer.close();
- }
- }
-
- /**
- * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition.
- */
- private static final class EmptyIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public ConsumerRecord<byte[], byte[]> next() {
- throw new IllegalStateException("this is an empty iterator");
- }
- }
-}