You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/10/13 17:48:21 UTC

[5/5] hive git commit: HIVE-20639 : Add ability to Write Data from Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)

HIVE-20639 : Add ability to Write Data from Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5ace1f78
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5ace1f78
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5ace1f78

Branch: refs/heads/master
Commit: 5ace1f7839c857902ff32181881aacf799c6c2fa
Parents: 7a88ffb
Author: Slim Bouguerra <sl...@gmail.com>
Authored: Wed Sep 26 10:08:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Oct 13 10:42:53 2018 -0700

----------------------------------------------------------------------
 itests/qtest-druid/pom.xml                      |   7 +-
 kafka-handler/README.md                         | 217 +++++++++++
 kafka-handler/pom.xml                           |  28 +-
 .../hadoop/hive/kafka/GenericKafkaSerDe.java    | 162 --------
 .../hadoop/hive/kafka/HiveKafkaProducer.java    | 256 +++++++++++++
 .../hadoop/hive/kafka/KafkaInputFormat.java     | 208 ++++++++++
 .../hadoop/hive/kafka/KafkaInputSplit.java      | 209 ++++++++++
 .../hadoop/hive/kafka/KafkaJsonSerDe.java       |  15 +-
 .../hadoop/hive/kafka/KafkaOutputFormat.java    | 117 ++++++
 .../hive/kafka/KafkaPullerInputFormat.java      | 205 ----------
 .../hive/kafka/KafkaPullerInputSplit.java       | 213 -----------
 .../hive/kafka/KafkaPullerRecordReader.java     | 173 ---------
 .../hadoop/hive/kafka/KafkaRecordIterator.java  |  75 ++--
 .../hadoop/hive/kafka/KafkaRecordReader.java    | 173 +++++++++
 .../hadoop/hive/kafka/KafkaRecordWritable.java  | 209 ----------
 .../hadoop/hive/kafka/KafkaScanTrimmer.java     |  83 ++--
 .../apache/hadoop/hive/kafka/KafkaSerDe.java    | 380 +++++++++++++++++++
 .../hadoop/hive/kafka/KafkaStorageHandler.java  | 313 ++++++++++++---
 .../hadoop/hive/kafka/KafkaStreamingUtils.java  | 255 -------------
 .../hadoop/hive/kafka/KafkaTableProperties.java |  83 ++++
 .../apache/hadoop/hive/kafka/KafkaUtils.java    | 255 +++++++++++++
 .../apache/hadoop/hive/kafka/KafkaWritable.java | 219 +++++++++++
 .../hadoop/hive/kafka/MetadataColumn.java       | 120 ++++++
 .../apache/hadoop/hive/kafka/RetryUtils.java    | 154 ++++++++
 .../hadoop/hive/kafka/SimpleKafkaWriter.java    | 185 +++++++++
 .../hive/kafka/TransactionalKafkaWriter.java    | 363 ++++++++++++++++++
 .../hive/kafka/HiveKafkaProducerTest.java       | 179 +++++++++
 .../hadoop/hive/kafka/KafkaBrokerResource.java  | 100 +++++
 .../hadoop/hive/kafka/KafkaInputSplitTest.java  | 126 ++++++
 .../hive/kafka/KafkaPullerInputSplitTest.java   | 131 -------
 .../hive/kafka/KafkaRecordIteratorTest.java     | 244 ++++++------
 .../hive/kafka/KafkaRecordWritableTest.java     |  84 ----
 .../hadoop/hive/kafka/KafkaScanTrimmerTest.java |  46 +--
 .../hive/kafka/KafkaStorageHandlerTest.java     | 145 +++++++
 .../hive/kafka/KafkaStreamingUtilsTest.java     | 108 ------
 .../hadoop/hive/kafka/KafkaUtilsTest.java       | 124 ++++++
 .../hadoop/hive/kafka/KafkaWritableTest.java    |  94 +++++
 .../hive/kafka/SimpleKafkaWriterTest.java       | 194 ++++++++++
 .../kafka/TransactionalKafkaWriterTest.java     | 246 ++++++++++++
 .../clientpositive/kafka_storage_handler.q      |  52 +++
 .../druid/kafka_storage_handler.q.out           | 226 ++++++++++-
 41 files changed, 4960 insertions(+), 1816 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/itests/qtest-druid/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index e566fcf..19cdf91 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -43,7 +43,7 @@
     <druid.derby.version>10.11.1.1</druid.derby.version>
     <druid.guava.version>16.0.1</druid.guava.version>
     <druid.guice.version>4.1.0</druid.guice.version>
-    <kafka.version>1.0.1</kafka.version>
+    <kafka.version>2.0.0</kafka.version>
   </properties>
       <dependencies>
         <dependency>
@@ -207,6 +207,11 @@
           <artifactId>kafka_2.11</artifactId>
           <version>${kafka.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+          <version>1.7.25</version>
+        </dependency>
       </dependencies>
   <build>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/README.md
----------------------------------------------------------------------
diff --git a/kafka-handler/README.md b/kafka-handler/README.md
new file mode 100644
index 0000000..706c77a
--- /dev/null
+++ b/kafka-handler/README.md
@@ -0,0 +1,217 @@
+#Kafka Storage Handler Module
+
+Storage Handler that allows user to Connect/Analyse/Transform Kafka topics.
+The workflow is as follow,  first the user will create an external table that is a view over one Kafka topic,
+then the user will be able to run any SQL query including write back to the same table or different kafka backed table.
+
+## Usage
+
+### Create Table
+Use following statement to create table:
+```sql
+CREATE EXTERNAL TABLE kafka_table
+(`timestamp` timestamp , `page` string,  `newPage` boolean,
+ added int, deleted bigint, delta double)
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
+```
+Table property `kafka.topic` is the Kafka Topic to connect to and `kafka.bootstrap.servers` is the Broker connection string.
+Both properties are mandatory.
+On the write path if such a topic does not exists the topic will be created if Kafka broker admin policy allow such operation.
+
+By default the serializer and deserializer is Json `org.apache.hadoop.hive.serde2.JsonSerDe`.
+If you want to switch serializer/deserializer classes you can use alter table.
+```sql
+ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+``` 
+List of supported Serializer Deserializer:
+
+|Supported Serializer Deserializer|
+|-----|
+|org.apache.hadoop.hive.serde2.JsonSerDe|
+|org.apache.hadoop.hive.serde2.OpenCSVSerde|
+|org.apache.hadoop.hive.serde2.avro.AvroSerDe|
+|org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe|
+|org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe|
+
+#### Table definition 
+In addition to the user defined payload schema Kafka Storage Handler will append additional columns allowing user to query the Kafka metadata fields:
+- `__key` Kafka record key (byte array)
+- `__partition` Kafka record partition identifier (int 32)
+- `__offset` Kafka record offset (int 64)
+- `__timestamp` Kafka record timestamp (int 64)
+
+ 
+### Query Table
+
+List the table properties and all the partition/offsets information for the topic. 
+```sql
+Describe extended kafka_table;
+```
+
+Count the number of records with Kafka record timestamp within the last 10 minutes interval.
+
+```sql
+SELECT count(*) from kafka_table 
+where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES);
+```
+The storage handler allow filter push-down read optimization,
+for instance the query above will only read the records with timestamp satisfying the filter predicate. 
+Please note that such time based seek is only viable if the Kafka broker allow time based lookup (Kafka 0.11 or later versions)
+
+In addition to **time based seek**, the storage handler reader is able to seek to a particular partition offset using the SQL WHERE clause.
+Currently only support OR/AND with (<, <=, >=, >)
+
+```sql
+SELECT count(*)  from kafka_table
+where (`__offset` < 10 and `__offset`>3 and `__partition` = 0)
+or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99)
+or (`__offset` = 109);
+```
+
+User can define a view to take of the last 15 minutes and mask what ever column as follow:
+
+```sql
+CREATE VIEW last_15_minutes_of_kafka_table as select  `timestamp`, `user`, delta, added from kafka_table 
+where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '15' MINUTES);
+```
+
+Join the Kafka Stream to Hive table. For instance assume you want to join the last 15 minutes of stream to dimension table like the following.
+```sql
+CREATE TABLE user_table (`user` string, `first_name` string , age int, gender string, comments string) STORED as ORC ;
+```
+
+Join the view of the last 15 minutes to `user_table`, group by user gender column and compute aggregates
+over metrics from fact table and dimension table.
+
+```sql
+SELECT sum(added) as added, sum(deleted) as deleted, avg(delta) as delta, avg(age) as avg_age , gender 
+FROM last_15_minutes_of_kafka_table  join user_table on `last_15_minutes_of_kafka_table`.`user` = `user_table`.`user`
+GROUP BY gender limit 10;
+```
+
+
+Join the Stream to the Stream it self. In cases where you want to perform some Ad-Hoc query over the last 15 minutes view.
+In the following example we show how you can perform classical user retention analysis over the Kafka Stream.
+```sql
+-- Steam join over the view it self
+-- The example is adapted from https://www.periscopedata.com/blog/how-to-calculate-cohort-retention-in-sql
+-- Assuming l15min_wiki is a view of the last 15 minutes
+select  count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
+from l15min_wiki as activity
+left join l15min_wiki as future_activity on
+  activity.`user` = future_activity.`user`
+  and activity.`timestamp` = future_activity.`timestamp` - interval '5' minutes ;
+
+--  Stream to stream join
+-- Assuming wiki_kafka_hive is the entire stream.
+select floor_hour(activity.`timestamp`), count( distinct activity.`user`) as active_users, count(distinct future_activity.`user`) as retained_users
+from wiki_kafka_hive as activity
+left join wiki_kafka_hive as future_activity on
+  activity.`user` = future_activity.`user`
+  and activity.`timestamp` = future_activity.`timestamp` - interval '1' hour group by floor_hour(activity.`timestamp`); 
+
+```
+
+#Configuration
+
+## Table Properties
+
+| Property                            | Description                                                                                                                        | Mandatory | Default                                 |
+|-------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------------------------|
+| kafka.topic                         | Kafka topic name to map the table to.                                                                                              | Yes       | null                                    |
+| kafka.bootstrap.servers             | Table property indicating Kafka broker(s) connection string.                                                                       | Yes       | null                                    |
+| kafka.serde.class                   | Serializer and Deserializer class implementation.                                                                                  | No        | org.apache.hadoop.hive.serde2.JsonSerDe |
+| hive.kafka.poll.timeout.ms          | Parameter indicating Kafka Consumer poll timeout period in millis.  FYI this is independent from internal Kafka consumer timeouts. | No        | 5000 (5 Seconds)                        |
+| hive.kafka.max.retries              | Number of retries for Kafka metadata fetch operations.                                                                             | No        | 6                                       |
+| hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata.                                                         | No        | 30000 (30 Seconds)                      |
+| kafka.write.semantic                | Writer semantic, allowed values (BEST_EFFORT, AT_LEAST_ONCE, EXACTLY_ONCE)                                                         | No        | AT_LEAST_ONCE                           |
+| hive.kafka.optimistic.commit        | Boolean value indicate the if the producer should commit during task or delegate the commit to HS2.                                | No        | true                                    |
+
+### Setting Extra Consumer/Producer properties.
+User can inject custom Kafka consumer/producer properties via the Table properties.
+To do so user can add any key/value pair of Kafka config to the Hive table property
+by prefixing the key with `kafka.consumer` for consumer configs and `kafka.producer` for producer configs.
+For instance the following alter table query adds the table property `"kafka.consumer.max.poll.records" = "5000"` 
+and will inject `max.poll.records=5000` to the Kafka Consumer.
+```sql
+ALTER TABLE kafka_table SET TBLPROPERTIES ("kafka.consumer.max.poll.records"="5000");
+```
+
+#Kafka to Hive ETL PIPE LINE
+
+load form Kafka every Record exactly once
+Goal is to read data and commit both data and its offsets in a single Transaction 
+
+First create the offset table.
+```sql
+Drop table kafka_table_offsets;
+create table kafka_table_offsets(partition_id int, max_offset bigint, insert_time timestamp);
+``` 
+
+Initialize the table
+```sql
+insert overwrite table kafka_table_offsets select `__partition`, min(`__offset`) - 1, CURRENT_TIMESTAMP 
+from wiki_kafka_hive group by `__partition`, CURRENT_TIMESTAMP ;
+``` 
+Create the end target table on the Hive warehouse.
+```sql
+Drop table orc_kafka_table;
+Create table orc_kafka_table (partition_id int, koffset bigint, ktimestamp bigint,
+ `timestamp` timestamp , `page` string, `user` string, `diffurl` string,
+ `isrobot` boolean, added int, deleted int, delta bigint
+) stored as ORC;
+```
+This an example tp insert up to offset = 2 only
+
+```sql
+From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
+on (ktable.`__partition` = offset_table.partition_id
+and ktable.`__offset` > offset_table.max_offset and  ktable.`__offset` < 3 )
+insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
+`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
+Insert overwrite table kafka_table_offsets select
+`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
+```
+
+Double check the insert
+```sql
+select max(`koffset`) from orc_kafka_table limit 10;
+select count(*) as c  from orc_kafka_table group by partition_id, koffset having c > 1;
+```
+
+Repeat this periodically to insert all data.
+
+```sql
+From wiki_kafka_hive ktable JOIN kafka_table_offsets offset_table
+on (ktable.`__partition` = offset_table.partition_id
+and ktable.`__offset` > offset_table.max_offset )
+insert into table orc_kafka_table select `__partition`, `__offset`, `__timestamp`,
+`timestamp`, `page`, `user`, `diffurl`, `isrobot`, added , deleted , delta
+Insert overwrite table kafka_table_offsets select
+`__partition`, max(`__offset`), CURRENT_TIMESTAMP group by `__partition`, CURRENT_TIMESTAMP;
+```
+
+#ETL from Hive to Kafka
+
+##INSERT INTO
+First create the table in have that will be the target table. Now all the inserts will go to the topic mapped by this Table.
+
+```sql
+CREATE EXTERNAL TABLE moving_avg_wiki_kafka_hive
+(`channel` string, `namespace` string,`page` string, `timestamp` timestamp , avg_delta double )
+STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
+TBLPROPERTIES
+("kafka.topic" = "moving_avg_wiki_kafka_hive_2",
+"kafka.bootstrap.servers"="cn105-10.l42scl.hortonworks.com:9092",
+-- STORE AS AVRO IN KAFKA
+"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe");
+```
+
+Then insert data into the table. Keep in mind that Kafka is an append only, thus you can not use insert overwrite. 
+```sql
+insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`, 
+avg(delta) over (order by `timestamp` asc rows between  60 preceding and current row) as avg_delta, 
+null as `__key`, null as `__partition`, -1, -1,-1, -1 from l15min_wiki;
+```

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/pom.xml
----------------------------------------------------------------------
diff --git a/kafka-handler/pom.xml b/kafka-handler/pom.xml
index 6c58bf1..f907e9d 100644
--- a/kafka-handler/pom.xml
+++ b/kafka-handler/pom.xml
@@ -30,7 +30,7 @@
 
   <properties>
     <hive.path.to.root>..</hive.path.to.root>
-    <kafka.version>1.0.1</kafka.version>
+    <kafka.version>2.0.0</kafka.version>
   </properties>
 
   <artifactId>kafka-handler</artifactId>
@@ -38,12 +38,18 @@
   <name>Hive Kafka Storage Handler</name>
 
   <dependencies>
-    <!-- intra-project -->
+    <!-- Intra-project -->
     <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-exec</artifactId>
       <scope>provided</scope>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>com.google.guava</groupId>
@@ -52,10 +58,22 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-api</artifactId>
+      </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
@@ -90,6 +108,12 @@
       <version>${kafka.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.25</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
deleted file mode 100644
index a0c79b3..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/GenericKafkaSerDe.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.JsonSerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
-import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.rmi.server.UID;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Generic Kafka Serde that allow user to delegate Serde to other class like Avro,
- * Json or any class that supports {@link BytesWritable}.
- */
-public class GenericKafkaSerDe extends AbstractSerDe {
-  private static final Logger LOG = LoggerFactory.getLogger(GenericKafkaSerDe.class);
-
-  private AbstractSerDe delegateSerDe;
-  private ObjectInspector objectInspector;
-  private final List<String> columnNames = Lists.newArrayList();
-  private StructObjectInspector delegateObjectInspector;
-  private final UID uid = new UID();
-  @SuppressWarnings("Guava") private Supplier<DatumReader<GenericRecord>> gdrSupplier;
-
-  @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException {
-    final String className = tbl.getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName());
-    delegateSerDe = KafkaStreamingUtils.createDelegate(className);
-    //noinspection deprecation
-    delegateSerDe.initialize(conf, tbl);
-    LOG.debug("Using SerDe instance {}", delegateSerDe.getClass().getCanonicalName());
-
-    if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) {
-      throw new SerDeException("Was expecting StructObject Inspector but have " + delegateSerDe.getObjectInspector()
-          .getClass()
-          .getName());
-    }
-    delegateObjectInspector = (StructObjectInspector) delegateSerDe.getObjectInspector();
-
-    // Build column names Order matters here
-    columnNames.addAll(delegateObjectInspector.getAllStructFieldRefs()
-        .stream()
-        .map(StructField::getFieldName)
-        .collect(Collectors.toList()));
-    columnNames.addAll(KafkaStreamingUtils.KAFKA_METADATA_COLUMN_NAMES);
-
-    final List<ObjectInspector> inspectors = new ArrayList<>(columnNames.size());
-    inspectors.addAll(delegateObjectInspector.getAllStructFieldRefs()
-        .stream()
-        .map(StructField::getFieldObjectInspector)
-        .collect(Collectors.toList()));
-    inspectors.addAll(KafkaStreamingUtils.KAFKA_METADATA_INSPECTORS);
-    objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
-
-    // lazy supplier to read Avro Records if needed
-    gdrSupplier = getReaderSupplier(tbl);
-  }
-
-  @Override public Class<? extends Writable> getSerializedClass() {
-    return delegateSerDe.getSerializedClass();
-  }
-
-  @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
-    return delegateSerDe.serialize(obj, objInspector);
-  }
-
-  @Override public SerDeStats getSerDeStats() {
-    return delegateSerDe.getSerDeStats();
-  }
-
-  @Override public Object deserialize(Writable blob) throws SerDeException {
-    KafkaRecordWritable record = (KafkaRecordWritable) blob;
-    // switch case the serde nature
-    final Object row;
-    if (delegateSerDe instanceof JsonSerDe) {
-      //@TODO Text constructor copies the data, this op is not needed
-      row = delegateSerDe.deserialize(new Text(record.getValue()));
-    } else if (delegateSerDe instanceof AvroSerDe) {
-      AvroGenericRecordWritable avroGenericRecordWritable = new AvroGenericRecordWritable();
-      GenericRecord avroRecord;
-      try {
-        avroRecord = gdrSupplier.get().read(null, DecoderFactory.get().binaryDecoder(record.getValue(), null));
-        avroGenericRecordWritable.setRecord(avroRecord);
-        avroGenericRecordWritable.setRecordReaderID(uid);
-        avroGenericRecordWritable.setFileSchema(avroRecord.getSchema());
-      } catch (IOException e) {
-        throw new SerDeException(e);
-      }
-      row = delegateSerDe.deserialize(avroGenericRecordWritable);
-    } else {
-      // default assuming delegate Serde know how to deal with
-      row = delegateSerDe.deserialize(new BytesWritable(record.getValue()));
-    }
-
-    return columnNames.stream().map(name -> {
-      Function<KafkaRecordWritable, Writable> metaColumnMapper = KafkaStreamingUtils.recordWritableFnMap.get(name);
-      if (metaColumnMapper != null) {
-        return metaColumnMapper.apply(record);
-      }
-      return delegateObjectInspector.getStructFieldData(row, delegateObjectInspector.getStructFieldRef(name));
-    }).collect(Collectors.toList());
-  }
-
-  @Override public ObjectInspector getObjectInspector() {
-    return objectInspector;
-  }
-
-  @SuppressWarnings("Guava") private Supplier<DatumReader<GenericRecord>> getReaderSupplier(Properties tbl) {
-    return Suppliers.memoize(() -> {
-      String schemaFromProperty = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), "");
-      Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
-      Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
-      LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
-      return new SpecificDatumReader<>(schema);
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
new file mode 100644
index 0000000..2270e08
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
+ * This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
+ * Class is mostly taken from Apache Flink Project:
+ * org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer
+ *
+ * @param <K> key serializer class.
+ * @param <V> value serializer class.
+ */
+class HiveKafkaProducer<K, V> implements Producer<K, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducer.class);
+
+  private final KafkaProducer<K, V> kafkaProducer;
+
+  @Nullable private final String transactionalId;
+
+  HiveKafkaProducer(Properties properties) {
+    transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+    kafkaProducer = new KafkaProducer<>(properties);
+  }
+
+  @Override public void initTransactions() {
+    kafkaProducer.initTransactions();
+  }
+
+  @Override public void beginTransaction() throws ProducerFencedException {
+    kafkaProducer.beginTransaction();
+  }
+
+  @Override public void commitTransaction() throws ProducerFencedException {
+    kafkaProducer.commitTransaction();
+  }
+
+  @Override public void abortTransaction() throws ProducerFencedException {
+    kafkaProducer.abortTransaction();
+  }
+
+  @Override public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
+      throws ProducerFencedException {
+    kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
+  }
+
+  @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+    return kafkaProducer.send(record);
+  }
+
+  @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
+    return kafkaProducer.send(record, callback);
+  }
+
+  @Override public List<PartitionInfo> partitionsFor(String topic) {
+    return kafkaProducer.partitionsFor(topic);
+  }
+
+  @Override public Map<MetricName, ? extends Metric> metrics() {
+    return kafkaProducer.metrics();
+  }
+
+  @Override public void close() {
+    kafkaProducer.close();
+  }
+
+  @Override public void close(long timeout, TimeUnit unit) {
+    kafkaProducer.close(timeout, unit);
+  }
+
+  @Override public void flush() {
+    kafkaProducer.flush();
+    if (transactionalId != null) {
+      flushNewPartitions();
+    }
+  }
+
+  /**
+   * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
+   * so that we can resume transaction after a restart. Implementation of this method is based on
+   * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
+   */
+  synchronized void resumeTransaction(long producerId, short epoch) {
+    Preconditions.checkState(producerId >= 0 && epoch >= 0,
+        "Incorrect values for producerId {} and epoch {}",
+        producerId,
+        epoch);
+    LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);
+
+    Object transactionManager = getValue(kafkaProducer, "transactionManager");
+
+    Object nextSequence = getValue(transactionManager, "nextSequence");
+    Object lastAckedSequence = getValue(transactionManager, "lastAckedSequence");
+
+    invoke(transactionManager,
+        "transitionTo",
+        getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
+    invoke(nextSequence, "clear");
+    invoke(lastAckedSequence, "clear");
+
+    Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+    setValue(producerIdAndEpoch, "producerId", producerId);
+    setValue(producerIdAndEpoch, "epoch", epoch);
+
+    invoke(transactionManager,
+        "transitionTo",
+        getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));
+
+    invoke(transactionManager,
+        "transitionTo",
+        getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
+    setValue(transactionManager, "transactionStarted", true);
+  }
+
+  @Nullable String getTransactionalId() {
+    return transactionalId;
+  }
+
+  long getProducerId() {
+    Object transactionManager = getValue(kafkaProducer, "transactionManager");
+    Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+    return (long) getValue(producerIdAndEpoch, "producerId");
+  }
+
+  short getEpoch() {
+    Object transactionManager = getValue(kafkaProducer, "transactionManager");
+    Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
+    return (short) getValue(producerIdAndEpoch, "epoch");
+  }
+
+  /**
+   * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
+   * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
+   * resumeTransaction simpler.
+   * Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight" partitions.
+   */
+  private void flushNewPartitions() {
+    LOG.info("Flushing new partitions");
+    TransactionalRequestResult result = enqueueNewPartitions();
+    Object sender = getValue(kafkaProducer, "sender");
+    invoke(sender, "wakeup");
+    result.await();
+  }
+
+  private synchronized TransactionalRequestResult enqueueNewPartitions() {
+    Object transactionManager = getValue(kafkaProducer, "transactionManager");
+    Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
+    invoke(transactionManager,
+        "enqueueRequest",
+        new Class[] {txnRequestHandler.getClass().getSuperclass()},
+        new Object[] {txnRequestHandler});
+    return (TransactionalRequestResult) getValue(txnRequestHandler,
+        txnRequestHandler.getClass().getSuperclass(),
+        "result");
+  }
+
+  @SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) {
+    @SuppressWarnings("RegExpRedundantEscape") String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
+    if (x.length == 2) {
+      String enumClassName = x[0];
+      String enumName = x[1];
+      try {
+        Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
+        return Enum.valueOf(cl, enumName);
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException("Incompatible KafkaProducer version", e);
+      }
+    }
+    return null;
+  }
+
+  private static Object invoke(Object object, String methodName, Object... args) {
+    Class<?>[] argTypes = new Class[args.length];
+    for (int i = 0; i < args.length; i++) {
+      argTypes[i] = args[i].getClass();
+    }
+    return invoke(object, methodName, argTypes, args);
+  }
+
+  private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
+    try {
+      Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
+      method.setAccessible(true);
+      return method.invoke(object, args);
+    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+      throw new RuntimeException("Incompatible KafkaProducer version", e);
+    }
+  }
+
+  private static Object getValue(Object object, String fieldName) {
+    return getValue(object, object.getClass(), fieldName);
+  }
+
+  private static Object getValue(Object object, Class<?> clazz, String fieldName) {
+    try {
+      Field field = clazz.getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return field.get(object);
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new RuntimeException("Incompatible KafkaProducer version", e);
+    }
+  }
+
+  private static void setValue(Object object, String fieldName, Object value) {
+    try {
+      Field field = object.getClass().getDeclaredField(fieldName);
+      field.setAccessible(true);
+      field.set(object, value);
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new RuntimeException("Incompatible KafkaProducer version", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
new file mode 100644
index 0000000..c401df9
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputFormat.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka puller input format to read records from a Kafka Queue.
+ * The input split will contain the set of topic partition and start/end offsets.
+ * Records will be returned as bytes array.
+ */
+@SuppressWarnings("WeakerAccess") public class KafkaInputFormat extends InputFormat<NullWritable, KafkaWritable>
+    implements org.apache.hadoop.mapred.InputFormat<NullWritable, KafkaWritable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaInputFormat.class);
+
+  @Override public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
+    List<KafkaInputSplit> inputSplits;
+    try {
+      inputSplits = computeSplits(jobConf);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
+    }
+    InputSplit[] inputSplitsArray = new InputSplit[inputSplits.size()];
+    return inputSplits.toArray(inputSplitsArray);
+  }
+
+  /**
+   * Build a full scan using Kafka list partition then beginning/end offsets.
+   * This function might block duo to calls like:
+   * org.apache.kafka.clients.consumer.KafkaConsumer#beginningOffsets(java.util.Collection)
+   *
+   * @param topic      kafka topic
+   * @param consumer   initialized kafka consumer
+   * @param tablePaths hive table path
+   *
+   * @return full scan input split collection based on Kafka metadata APIs
+   */
+  private static List<KafkaInputSplit> buildFullScanFromKafka(String topic,
+      KafkaConsumer<byte[], byte[]> consumer,
+      Path[] tablePaths, int maxTries) {
+    final Map<TopicPartition, Long> starOffsetsMap;
+    final Map<TopicPartition, Long> endOffsetsMap;
+
+    final List<TopicPartition> topicPartitions;
+    RetryUtils.Task<List<TopicPartition>> fetchTPTask = () -> fetchTopicPartitions(topic, consumer);
+    try {
+      topicPartitions = RetryUtils.retry(fetchTPTask, (error) -> !KafkaUtils.exceptionIsFatal(error), maxTries);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    starOffsetsMap = consumer.beginningOffsets(topicPartitions);
+    endOffsetsMap = consumer.endOffsets(topicPartitions);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Found the following partitions [{}]",
+          topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")));
+      starOffsetsMap.forEach((tp, start) -> LOG.info("TPartition [{}],Start offsets [{}]", tp, start));
+      endOffsetsMap.forEach((tp, end) -> LOG.info("TPartition [{}],End offsets [{}]", tp, end));
+    }
+    return topicPartitions.stream()
+        .map(topicPartition -> new KafkaInputSplit(topicPartition.topic(),
+            topicPartition.partition(),
+            starOffsetsMap.get(topicPartition),
+            endOffsetsMap.get(topicPartition),
+            tablePaths[0]))
+        .collect(Collectors.toList());
+  }
+
+  private List<KafkaInputSplit> computeSplits(Configuration configuration)
+      throws IOException, InterruptedException {
+    // ExecutorService is used to harness some KAFKA blocking calls and interrupt after some duration
+    final ExecutorService execService = Executors.newSingleThreadExecutor();
+
+    try (KafkaConsumer consumer = new KafkaConsumer(KafkaUtils.consumerProperties(configuration))) {
+      final String topic = configuration.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
+      final long timeoutMs = configuration.getLong(KafkaTableProperties.KAFKA_FETCH_METADATA_TIMEOUT.getName(), -1);
+      final int maxTries = configuration.getInt(KafkaTableProperties.MAX_RETRIES.getName(), -1);
+      // hive depends on FileSplits
+      JobConf jobConf = new JobConf(configuration);
+      Path[] tablePaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf);
+
+      final Future<List<KafkaInputSplit>> futureFullHouse;
+      //noinspection unchecked
+      futureFullHouse = execService.submit(() -> buildFullScanFromKafka(topic, consumer, tablePaths, maxTries));
+      final List<KafkaInputSplit> fullHouse;
+      try {
+        fullHouse = futureFullHouse.get(timeoutMs, TimeUnit.MILLISECONDS);
+      } catch (TimeoutException | ExecutionException e) {
+        futureFullHouse.cancel(true);
+        LOG.error("can not generate full scan split", e);
+        // at this point we can not go further fail split generation
+        throw new IOException(e);
+      }
+
+      @SuppressWarnings("unchecked") final ImmutableMap.Builder<TopicPartition, KafkaInputSplit>
+          fullHouseMapBuilder =
+          new ImmutableMap.Builder();
+      fullHouse.forEach(input -> fullHouseMapBuilder.put(new TopicPartition(input.getTopic(), input.getPartition()),
+          input));
+
+      final KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouseMapBuilder.build(), consumer);
+      final String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+
+      if (filterExprSerialized != null && !filterExprSerialized.isEmpty()) {
+        ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
+        LOG.info("Kafka trimmer working on Filter tree {}", filterExpr.getExprString());
+        Callable<List<KafkaInputSplit>>
+            trimmerWorker = () -> kafkaScanTrimmer.computeOptimizedScan(filterExpr)
+                .entrySet()
+                .stream()
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+
+        Future<List<KafkaInputSplit>> futureTinyHouse = execService.submit(trimmerWorker);
+        try {
+          return futureTinyHouse.get(timeoutMs, TimeUnit.MILLISECONDS)
+              .stream()
+              // filter out empty splits
+              .filter(split -> split.getStartOffset() < split.getEndOffset())
+              .collect(Collectors.toList());
+        } catch (ExecutionException | TimeoutException e) {
+          futureTinyHouse.cancel(true);
+          LOG.error("Had issue with trimmer will return full scan ", e);
+          return fullHouse;
+        }
+      }
+      //Case null: it can be filter evaluated to false or no filter at all thus return full scan
+      return fullHouse;
+    } finally {
+      execService.shutdown();
+    }
+  }
+
+  private static List<TopicPartition> fetchTopicPartitions(String topic, KafkaConsumer<byte[], byte[]> consumer) {
+    // this will block till REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms"
+    // then throws org.apache.kafka.common.errors.TimeoutException if can not fetch metadata
+    // @TODO add retry logic maybe
+    List<PartitionInfo> partitions = consumer.partitionsFor(topic);
+    return partitions.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList());
+  }
+
+  @Override public RecordReader<NullWritable, KafkaWritable> getRecordReader(InputSplit inputSplit,
+      JobConf jobConf,
+      Reporter reporter) {
+    return new KafkaRecordReader((KafkaInputSplit) inputSplit, jobConf);
+  }
+
+  @Override public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    return computeSplits(jobContext.getConfiguration()).stream()
+        .map(kafkaPullerInputSplit -> (org.apache.hadoop.mapreduce.InputSplit) kafkaPullerInputSplit)
+        .collect(Collectors.toList());
+  }
+
+  @Override public org.apache.hadoop.mapreduce.RecordReader<NullWritable, KafkaWritable> createRecordReader(
+      org.apache.hadoop.mapreduce.InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) {
+    return new KafkaRecordReader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
new file mode 100644
index 0000000..cb1f4df
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaInputSplit.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+
+import javax.annotation.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Kafka Hadoop Input Split Class.
+ */
+@SuppressWarnings("WeakerAccess") public class KafkaInputSplit extends FileSplit
+    implements org.apache.hadoop.mapred.InputSplit {
+  private String topic;
+  private long startOffset;
+  private int partition;
+  private long endOffset;
+
+  public KafkaInputSplit() {
+    super(null, 0, 0, (String[]) null);
+  }
+
+  public KafkaInputSplit(String topic, int partition, long startOffset, long endOffset, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.topic = topic;
+    this.startOffset = startOffset;
+    this.partition = partition;
+    this.endOffset = endOffset;
+    Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
+        "start [%s] has to be positive and >= end [%]",
+        startOffset,
+        endOffset);
+  }
+
+  @Override public long getLength() {
+    return 0;
+  }
+
+  @Override public String[] getLocations() {
+    return new String[0];
+  }
+
+  @Override public void write(DataOutput dataOutput) throws IOException {
+    super.write(dataOutput);
+    dataOutput.writeUTF(topic);
+    dataOutput.writeInt(partition);
+    dataOutput.writeLong(startOffset);
+    dataOutput.writeLong(endOffset);
+  }
+
+  @Override public void readFields(DataInput dataInput) throws IOException {
+    super.readFields(dataInput);
+    topic = dataInput.readUTF();
+    partition = dataInput.readInt();
+    startOffset = dataInput.readLong();
+    endOffset = dataInput.readLong();
+    Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
+        "start [%s] has to be positive and >= end [%]",
+        startOffset,
+        endOffset);
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getPartition() {
+    return partition;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getEndOffset() {
+    return endOffset;
+  }
+
+  /**
+   * Compute the intersection of 2 splits. Splits must share the same topic and partition number.
+   *
+   * @param split1 left split
+   * @param split2 right split
+   *
+   * @return new split that represents range intersection or null if it is not overlapping
+   */
+  @Nullable public static KafkaInputSplit intersectRange(KafkaInputSplit split1,
+      KafkaInputSplit split2) {
+    assert (split1.topic.equals(split2.topic));
+    assert (split1.partition == split2.partition);
+    final long startOffset = Math.max(split1.getStartOffset(), split2.getStartOffset());
+    final long endOffset = Math.min(split1.getEndOffset(), split2.getEndOffset());
+    if (startOffset > endOffset) {
+      // there is no overlapping
+      return null;
+    }
+    return new KafkaInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
+  }
+
+  /**
+   * Compute union of ranges between splits. Splits must share the same topic and partition
+   *
+   * @param split1 left split
+   * @param split2 right split
+   *
+   * @return new split with a range including both splits.
+   */
+  public static KafkaInputSplit unionRange(KafkaInputSplit split1, KafkaInputSplit split2) {
+    assert (split1.topic.equals(split2.topic));
+    assert (split1.partition == split2.partition);
+    final long startOffset = Math.min(split1.getStartOffset(), split2.getStartOffset());
+    final long endOffset = Math.max(split1.getEndOffset(), split2.getEndOffset());
+    return new KafkaInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof KafkaInputSplit)) {
+      return false;
+    }
+    KafkaInputSplit that = (KafkaInputSplit) o;
+    return Objects.equal(getTopic(), that.getTopic())
+        && Objects.equal(getStartOffset(), that.getStartOffset())
+        && Objects.equal(getPartition(), that.getPartition())
+        && Objects.equal(getEndOffset(), that.getEndOffset());
+  }
+
+  @Override public int hashCode() {
+    return Objects.hashCode(getTopic(), getStartOffset(), getPartition(), getEndOffset());
+  }
+
+  @Override public String toString() {
+    return "KafkaInputSplit{"
+        + "topic='"
+        + topic
+        + '\''
+        + ", startOffset="
+        + startOffset
+        + ", partition="
+        + partition
+        + ", endOffset="
+        + endOffset
+        + ", path="
+        + super.getPath().toString()
+        + '}';
+  }
+
+  public static KafkaInputSplit copyOf(KafkaInputSplit other) {
+    return new KafkaInputSplit(other.getTopic(),
+        other.getPartition(),
+        other.getStartOffset(),
+        other.getEndOffset(),
+        other.getPath());
+  }
+
+  public static List<KafkaInputSplit> slice(long sliceSize, final KafkaInputSplit split) {
+    if (split.getEndOffset() - split.getStartOffset() > sliceSize) {
+      ImmutableList.Builder<KafkaInputSplit> builder = ImmutableList.builder();
+      long start = split.getStartOffset();
+      while (start < split.getEndOffset() - sliceSize) {
+        builder.add(new KafkaInputSplit(split.topic,
+            split.partition,
+            start,
+            start + sliceSize + 1,
+            split.getPath()));
+        start += sliceSize + 1;
+      }
+      // last split
+      if (start < split.getEndOffset()) {
+        builder.add(new KafkaInputSplit(split.topic,
+            split.partition,
+            start,
+            split.getEndOffset(),
+            split.getPath()));
+      }
+      return builder.build();
+    }
+
+    return Collections.singletonList(copyOf(split));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
index 5f0143d..228225c 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaJsonSerDe.java
@@ -78,15 +78,14 @@ import java.util.stream.Collectors;
  * Basic JsonSerDe to make use of such storage handler smooth and easy and testing basic primitive Json.
  * For production please use Hive native JsonSerde.
  */
-public class KafkaJsonSerDe extends AbstractSerDe {
+@SuppressWarnings("unused") class KafkaJsonSerDe extends AbstractSerDe {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonSerDe.class);
   private static final ThreadLocal<DateTimeFormatter>
       TS_PARSER =
       ThreadLocal.withInitial(KafkaJsonSerDe::createAutoParser);
-  private static final Function<TypeInfo, ObjectInspector>
-      typeInfoToObjectInspector =
-      typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
-          typeInfo.getTypeName()));
+  private static final Function<TypeInfo, ObjectInspector> TYPEINFO_TO_OI =
+      typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+          TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()));
   private List<String> columnNames;
   private List<TypeInfo> columnTypes;
   private ObjectInspector inspector;
@@ -118,7 +117,7 @@ public class KafkaJsonSerDe extends AbstractSerDe {
       LOG.debug("types: {}, {} ", columnTypeProperty, columnTypes);
     }
 
-    inspectors = columnTypes.stream().map(typeInfoToObjectInspector).collect(Collectors.toList());
+    inspectors = columnTypes.stream().map(TYPEINFO_TO_OI).collect(Collectors.toList());
     inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
   }
 
@@ -236,8 +235,8 @@ public class KafkaJsonSerDe extends AbstractSerDe {
     DateTimeParser
         timeOrOffset =
         new DateTimeFormatterBuilder().append(null,
-            new DateTimeParser[] { new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
-                new DateTimeFormatterBuilder().appendLiteral(' ').toParser() })
+            new DateTimeParser[] {new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
+                new DateTimeFormatterBuilder().appendLiteral(' ').toParser()})
             .appendOptional(ISODateTimeFormat.timeElementParser().getParser())
             .appendOptional(offsetElement.getParser())
             .toParser();

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
new file mode 100644
index 0000000..950f731
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Kafka Hive Output Format class used to write Hive Rows to a Kafka Queue.
+ */
+public class KafkaOutputFormat implements HiveOutputFormat<NullWritable, KafkaWritable> {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaOutputFormat.class);
+
+  @Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc,
+      Path finalOutPath,
+      Class<? extends Writable> valueClass,
+      boolean isCompressed,
+      Properties tableProperties,
+      Progressable progress) {
+    final String topic = jc.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
+    final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), true);
+    final WriteSemantic
+        writeSemantic =
+        WriteSemantic.valueOf(jc.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName()));
+    final Properties producerProperties = KafkaUtils.producerProperties(jc);
+    final FileSinkOperator.RecordWriter recordWriter;
+    switch (writeSemantic) {
+    case BEST_EFFORT:
+      recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), false, producerProperties);
+      break;
+    case AT_LEAST_ONCE:
+      recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), true, producerProperties);
+      break;
+    case EXACTLY_ONCE:
+      FileSystem fs;
+      try {
+        fs = finalOutPath.getFileSystem(jc);
+      } catch (IOException e) {
+        LOG.error("Can not construct file system instance", e);
+        throw new RuntimeException(e);
+      }
+      final String queryId = Preconditions.checkNotNull(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname, null));
+      recordWriter =
+          new TransactionalKafkaWriter(topic, producerProperties,
+              new Path(Preconditions.checkNotNull(finalOutPath), queryId),
+              fs,
+              optimisticCommit);
+      break;
+    default:
+      throw new IllegalArgumentException(String.format("Unknown delivery semantic [%s]", writeSemantic.toString()));
+    }
+    return recordWriter;
+  }
+
+  @Override public RecordWriter<NullWritable, KafkaWritable> getRecordWriter(FileSystem fileSystem,
+      JobConf jobConf,
+      String s,
+      Progressable progressable) {
+    throw new RuntimeException("this is not suppose to be here");
+  }
+
+  @Override public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) {
+
+  }
+
+  /**
+   * Possible write semantic supported by the Record Writer.
+   */
+  enum WriteSemantic {
+    /**
+     * Best effort delivery with no guarantees at all, user can set Producer properties as they wish,
+     * will carry on when possible unless it is a fatal exception.
+     */
+    BEST_EFFORT,
+    /**
+     * Deliver all the record at least once unless the job fails.
+     * Therefore duplicates can be introduced due to lost ACKs or Tasks retries.
+     * Currently this is the default.
+     */
+    AT_LEAST_ONCE,
+    /**
+     * Deliver every record exactly once.
+     */
+    EXACTLY_ONCE,
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
deleted file mode 100644
index 2d5637d..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputFormat.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-/**
- * Kafka puller input format to read records from a Kafka Queue.
- * The input split will contain the set of topic partition and start/end offsets.
- * Records will be returned as bytes array.
- */
-public class KafkaPullerInputFormat extends InputFormat<NullWritable, KafkaRecordWritable>
-    implements org.apache.hadoop.mapred.InputFormat<NullWritable, KafkaRecordWritable> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerInputFormat.class);
-
-  @Override public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
-    List<KafkaPullerInputSplit> inputSplits;
-    try {
-      inputSplits = computeSplits(jobConf);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(e);
-    }
-    InputSplit[] inputSplitsArray = new InputSplit[inputSplits.size()];
-    return inputSplits.toArray(inputSplitsArray);
-  }
-
-  /**
-   * Build a full scan using Kafka list partition then beginning/end offsets.
-   * This function might block duo to calls like:
-   * org.apache.kafka.clients.consumer.KafkaConsumer#beginningOffsets(java.util.Collection)
-   *
-   * @param topic      kafka topic
-   * @param consumer   initialized kafka consumer
-   * @param tablePaths hive table path
-   *
-   * @return full scan input split collection based on Kafka metadata APIs
-   */
-  private static List<KafkaPullerInputSplit> buildFullScanFromKafka(String topic,
-      KafkaConsumer<byte[], byte[]> consumer,
-      Path[] tablePaths) {
-    final Map<TopicPartition, Long> starOffsetsMap;
-    final Map<TopicPartition, Long> endOffsetsMap;
-
-    final List<TopicPartition> topicPartitions;
-    topicPartitions = fetchTopicPartitions(topic, consumer);
-    starOffsetsMap = consumer.beginningOffsets(topicPartitions);
-    endOffsetsMap = consumer.endOffsets(topicPartitions);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.info("Found the following partitions [{}]",
-          topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",")));
-      starOffsetsMap.forEach((tp, start) -> LOG.info("TPartition [{}],Start offsets [{}]", tp, start));
-      endOffsetsMap.forEach((tp, end) -> LOG.info("TPartition [{}],End offsets [{}]", tp, end));
-    }
-    return topicPartitions.stream()
-        .map(topicPartition -> new KafkaPullerInputSplit(topicPartition.topic(),
-            topicPartition.partition(),
-            starOffsetsMap.get(topicPartition),
-            endOffsetsMap.get(topicPartition),
-            tablePaths[0]))
-        .collect(Collectors.toList());
-  }
-
-  private List<KafkaPullerInputSplit> computeSplits(Configuration configuration)
-      throws IOException, InterruptedException {
-    // this will be used to harness some KAFKA blocking calls
-    final ExecutorService execService = Executors.newSingleThreadExecutor();
-    try (KafkaConsumer consumer = new KafkaConsumer(KafkaStreamingUtils.consumerProperties(configuration))) {
-      final String topic = configuration.get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
-      final long
-          timeoutMs =
-          configuration.getLong(KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT,
-              KafkaStreamingUtils.DEFAULT_CONSUMER_POLL_TIMEOUT_MS);
-      // hive depends on FileSplits
-      JobConf jobConf = new JobConf(configuration);
-      Path[] tablePaths = org.apache.hadoop.mapred.FileInputFormat.getInputPaths(jobConf);
-
-      //noinspection unchecked
-      Future<List<KafkaPullerInputSplit>>
-          futureFullHouse =
-          execService.submit(() -> buildFullScanFromKafka(topic, consumer, tablePaths));
-      List<KafkaPullerInputSplit> fullHouse;
-      try {
-        fullHouse = futureFullHouse.get(timeoutMs, TimeUnit.MILLISECONDS);
-      } catch (TimeoutException | ExecutionException e) {
-        futureFullHouse.cancel(true);
-        LOG.error("can not generate full scan split", e);
-        // at this point we can not go further fail split generation
-        throw new IOException(e);
-      }
-
-      @SuppressWarnings("unchecked") final ImmutableMap.Builder<TopicPartition, KafkaPullerInputSplit>
-          fullHouseMapBuilder =
-          new ImmutableMap.Builder();
-      fullHouse.forEach(input -> fullHouseMapBuilder.put(new TopicPartition(input.getTopic(), input.getPartition()),
-          input));
-
-      final KafkaScanTrimmer kafkaScanTrimmer = new KafkaScanTrimmer(fullHouseMapBuilder.build(), consumer);
-      final String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-
-      if (filterExprSerialized != null && !filterExprSerialized.isEmpty()) {
-        ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression(filterExprSerialized);
-        LOG.info("Kafka trimmer working on Filter tree {}", filterExpr.getExprString());
-        Callable<List<KafkaPullerInputSplit>>
-            trimmerWorker = () -> kafkaScanTrimmer.computeOptimizedScan(filterExpr)
-                .entrySet()
-                .stream()
-                .map(Map.Entry::getValue)
-                .collect(Collectors.toList());
-
-        Future<List<KafkaPullerInputSplit>> futureTinyHouse = execService.submit(trimmerWorker);
-        try {
-          return futureTinyHouse.get(timeoutMs, TimeUnit.MILLISECONDS)
-              .stream()
-              // filter out empty splits
-              .filter(split -> split.getStartOffset() < split.getEndOffset())
-              .collect(Collectors.toList());
-        } catch (ExecutionException | TimeoutException e) {
-          futureTinyHouse.cancel(true);
-          LOG.error("Had issue with trimmer will return full scan ", e);
-          return fullHouse;
-        }
-      }
-      //Case null: it can be filter evaluated to false or no filter at all thus return full scan
-      return fullHouse;
-    } finally {
-      execService.shutdown();
-    }
-  }
-
-  private static List<TopicPartition> fetchTopicPartitions(String topic, KafkaConsumer<byte[], byte[]> consumer) {
-    // this will block till REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms"
-    // then throws org.apache.kafka.common.errors.TimeoutException if can not fetch metadata
-    // @TODO add retry logic maybe
-    List<PartitionInfo> partitions = consumer.partitionsFor(topic);
-    return partitions.stream().map(p -> new TopicPartition(topic, p.partition())).collect(Collectors.toList());
-  }
-
-  @Override public RecordReader<NullWritable, KafkaRecordWritable> getRecordReader(InputSplit inputSplit,
-      JobConf jobConf,
-      Reporter reporter) {
-    return new KafkaPullerRecordReader((KafkaPullerInputSplit) inputSplit, jobConf);
-  }
-
-  @Override public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext jobContext)
-      throws IOException, InterruptedException {
-    return computeSplits(jobContext.getConfiguration()).stream()
-        .map(kafkaPullerInputSplit -> (org.apache.hadoop.mapreduce.InputSplit) kafkaPullerInputSplit)
-        .collect(Collectors.toList());
-  }
-
-  @Override public org.apache.hadoop.mapreduce.RecordReader<NullWritable, KafkaRecordWritable> createRecordReader(
-      org.apache.hadoop.mapreduce.InputSplit inputSplit,
-      TaskAttemptContext taskAttemptContext) {
-    return new KafkaPullerRecordReader();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
deleted file mode 100644
index 697469c..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerInputSplit.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-
-import javax.annotation.Nullable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Kafka Hadoop Input Split Class.
- */
-@SuppressWarnings("WeakerAccess") public class KafkaPullerInputSplit extends FileSplit
-    implements org.apache.hadoop.mapred.InputSplit {
-  private String topic;
-  private long startOffset;
-  private int partition;
-  private long endOffset;
-
-  public KafkaPullerInputSplit() {
-    super(null, 0, 0, (String[]) null);
-  }
-
-  public KafkaPullerInputSplit(String topic, int partition, long startOffset, long endOffset, Path dummyPath) {
-    super(dummyPath, 0, 0, (String[]) null);
-    this.topic = topic;
-    this.startOffset = startOffset;
-    this.partition = partition;
-    this.endOffset = endOffset;
-    Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
-        "start [%s] has to be positive and >= end [%]",
-        startOffset,
-        endOffset);
-  }
-
-  @Override public long getLength() {
-    return 0;
-  }
-
-  @Override public String[] getLocations() {
-    return new String[0];
-  }
-
-  @Override public void write(DataOutput dataOutput) throws IOException {
-    super.write(dataOutput);
-    dataOutput.writeUTF(topic);
-    dataOutput.writeInt(partition);
-    dataOutput.writeLong(startOffset);
-    dataOutput.writeLong(endOffset);
-  }
-
-  @Override public void readFields(DataInput dataInput) throws IOException {
-    super.readFields(dataInput);
-    topic = dataInput.readUTF();
-    partition = dataInput.readInt();
-    startOffset = dataInput.readLong();
-    endOffset = dataInput.readLong();
-    Preconditions.checkArgument(startOffset >= 0 && startOffset <= endOffset,
-        "start [%s] has to be positive and >= end [%]",
-        startOffset,
-        endOffset);
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public int getPartition() {
-    return partition;
-  }
-
-  public long getStartOffset() {
-    return startOffset;
-  }
-
-  public long getEndOffset() {
-    return endOffset;
-  }
-
-  /**
-   * Compute the intersection of 2 splits. Splits must share the same topic and partition number.
-   *
-   * @param split1 left split
-   * @param split2 right split
-   *
-   * @return new split that represents range intersection or null if it is not overlapping
-   */
-  @Nullable public static KafkaPullerInputSplit intersectRange(KafkaPullerInputSplit split1,
-      KafkaPullerInputSplit split2) {
-    assert (split1.topic.equals(split2.topic));
-    assert (split1.partition == split2.partition);
-    final long startOffset = Math.max(split1.getStartOffset(), split2.getStartOffset());
-    final long endOffset = Math.min(split1.getEndOffset(), split2.getEndOffset());
-    if (startOffset > endOffset) {
-      // there is no overlapping
-      return null;
-    }
-    return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
-  }
-
-  /**
-   * Compute union of ranges between splits. Splits must share the same topic and partition
-   *
-   * @param split1 left split
-   * @param split2 right split
-   *
-   * @return new split with a range including both splits.
-   */
-  public static KafkaPullerInputSplit unionRange(KafkaPullerInputSplit split1, KafkaPullerInputSplit split2) {
-    assert (split1.topic.equals(split2.topic));
-    assert (split1.partition == split2.partition);
-    final long startOffset = Math.min(split1.getStartOffset(), split2.getStartOffset());
-    final long endOffset = Math.max(split1.getEndOffset(), split2.getEndOffset());
-    return new KafkaPullerInputSplit(split1.topic, split1.partition, startOffset, endOffset, split1.getPath());
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof KafkaPullerInputSplit)) {
-      return false;
-    }
-    KafkaPullerInputSplit that = (KafkaPullerInputSplit) o;
-    return Objects.equal(getTopic(), that.getTopic())
-        && Objects.equal(getStartOffset(), that.getStartOffset())
-        && Objects.equal(getPartition(), that.getPartition())
-        && Objects.equal(getEndOffset(), that.getEndOffset());
-  }
-
-  @Override public int hashCode() {
-    return Objects.hashCode(getTopic(), getStartOffset(), getPartition(), getEndOffset());
-  }
-
-  @Override public String toString() {
-    return "KafkaPullerInputSplit{"
-        + "topic='"
-        + topic
-        + '\''
-        + ", startOffset="
-        + startOffset
-        + ", partition="
-        + partition
-        + ", endOffset="
-        + endOffset
-        + ", path="
-        + super.getPath().toString()
-        + '}';
-  }
-
-  public static KafkaPullerInputSplit copyOf(KafkaPullerInputSplit other) {
-    return new KafkaPullerInputSplit(other.getTopic(),
-        other.getPartition(),
-        other.getStartOffset(),
-        other.getEndOffset(),
-        other.getPath());
-  }
-
-  @SuppressWarnings("MethodDoesntCallSuperMethod") public KafkaPullerInputSplit clone() {
-    return copyOf(this);
-  }
-
-  public static List<KafkaPullerInputSplit> slice(long sliceSize, final KafkaPullerInputSplit split) {
-    if (split.getEndOffset() - split.getStartOffset() > sliceSize) {
-      ImmutableList.Builder<KafkaPullerInputSplit> builder = ImmutableList.builder();
-      long start = split.getStartOffset();
-      while (start < split.getEndOffset() - sliceSize) {
-        builder.add(new KafkaPullerInputSplit(split.topic,
-            split.partition,
-            start,
-            start + sliceSize + 1,
-            split.getPath()));
-        start += sliceSize + 1;
-      }
-      // last split
-      if (start < split.getEndOffset()) {
-        builder.add(new KafkaPullerInputSplit(split.topic,
-            split.partition,
-            start,
-            split.getEndOffset(),
-            split.getPath()));
-      }
-      return builder.build();
-    }
-
-    return Collections.singletonList(copyOf(split));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
deleted file mode 100644
index 06a10b4..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaPullerRecordReader.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Properties;
-
-/**
- * Kafka Records Reader implementation.
- */
-@SuppressWarnings("UnstableApiUsage") public class KafkaPullerRecordReader extends RecordReader<NullWritable, KafkaRecordWritable>
-    implements org.apache.hadoop.mapred.RecordReader<NullWritable, KafkaRecordWritable> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaPullerRecordReader.class);
-
-  private KafkaConsumer<byte[], byte[]> consumer = null;
-  private Configuration config = null;
-  private KafkaRecordWritable currentWritableValue;
-  private Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor = null;
-
-  private long totalNumberRecords = 0L;
-  private long consumedRecords = 0L;
-  private long readBytes = 0L;
-  private volatile boolean started = false;
-  private long startOffset = -1L;
-  private long endOffset = Long.MAX_VALUE;
-
-  @SuppressWarnings("WeakerAccess") public KafkaPullerRecordReader() {
-  }
-
-  private void initConsumer() {
-    if (consumer == null) {
-      LOG.info("Initializing Kafka Consumer");
-      final Properties properties = KafkaStreamingUtils.consumerProperties(config);
-      String brokerString = properties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
-      Preconditions.checkNotNull(brokerString, "broker end point can not be null");
-      LOG.info("Starting Consumer with Kafka broker string [{}]", brokerString);
-      consumer = new KafkaConsumer<>(properties);
-    }
-  }
-
-  @SuppressWarnings("WeakerAccess") public KafkaPullerRecordReader(KafkaPullerInputSplit inputSplit,
-      Configuration jobConf) {
-    initialize(inputSplit, jobConf);
-  }
-
-  private synchronized void initialize(KafkaPullerInputSplit inputSplit, Configuration jobConf) {
-    if (!started) {
-      this.config = jobConf;
-      startOffset = inputSplit.getStartOffset();
-      endOffset = inputSplit.getEndOffset();
-      TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
-      Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
-          "Start [%s] has to be positive and less or equal than End [%s]", startOffset, endOffset);
-      totalNumberRecords += endOffset - startOffset;
-      initConsumer();
-      long
-          pollTimeout =
-          config.getLong(KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT,
-              KafkaStreamingUtils.DEFAULT_CONSUMER_POLL_TIMEOUT_MS);
-      LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
-      this.recordsCursor =
-          startOffset == endOffset ?
-              new EmptyIterator() :
-              new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout);
-      started = true;
-    }
-  }
-
-  @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit,
-      TaskAttemptContext context) {
-    initialize((KafkaPullerInputSplit) inputSplit, context.getConfiguration());
-  }
-
-  @Override public boolean next(NullWritable nullWritable, KafkaRecordWritable bytesWritable) {
-    if (started && recordsCursor.hasNext()) {
-      ConsumerRecord<byte[], byte[]> record = recordsCursor.next();
-      bytesWritable.set(record, startOffset, endOffset);
-      consumedRecords += 1;
-      readBytes += record.serializedValueSize();
-      return true;
-    }
-    return false;
-  }
-
-  @Override public NullWritable createKey() {
-    return NullWritable.get();
-  }
-
-  @Override public KafkaRecordWritable createValue() {
-    return new KafkaRecordWritable();
-  }
-
-  @Override public long getPos() {
-    return -1;
-  }
-
-  @Override public boolean nextKeyValue() {
-    currentWritableValue = new KafkaRecordWritable();
-    if (next(NullWritable.get(), currentWritableValue)) {
-      return true;
-    }
-    currentWritableValue = null;
-    return false;
-  }
-
-  @Override public NullWritable getCurrentKey() {
-    return NullWritable.get();
-  }
-
-  @Override public KafkaRecordWritable getCurrentValue() {
-    return Preconditions.checkNotNull(currentWritableValue);
-  }
-
-  @Override public float getProgress() {
-    if (consumedRecords == 0) {
-      return 0f;
-    }
-    if (consumedRecords >= totalNumberRecords) {
-      return 1f;
-    }
-    return consumedRecords * 1.0f / totalNumberRecords;
-  }
-
-  @Override public void close() {
-    LOG.trace("total read bytes [{}]", readBytes);
-    if (consumer != null) {
-      consumer.wakeup();
-      consumer.close();
-    }
-  }
-
-  /**
-   * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition.
-   */
-  private static final class EmptyIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
-    @Override public boolean hasNext() {
-      return false;
-    }
-
-    @Override public ConsumerRecord<byte[], byte[]> next() {
-      throw new IllegalStateException("this is an empty iterator");
-    }
-  }
-}