You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/10/13 17:48:20 UTC

[4/5] hive git commit: HIVE-20639 : Add ability to Write Data from Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
index c252455..2225f19 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java
@@ -25,24 +25,31 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Iterator over Kafka Records to read records from a single topic partition inclusive start, exclusive end.
+ * This class implements an Iterator over a single Kafka topic partition.
+ *
+ * Notes:
+ * The user of this class has to provide a functional Kafka Consumer and then has to clean it afterward.
+ * The user of this class is responsible for thread safety if the provided consumer is shared across threads.
+ *
  */
-public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordIterator.class);
   private static final String
       POLL_TIMEOUT_HINT =
       String.format("Try increasing poll timeout using Hive Table property [%s]",
-          KafkaStreamingUtils.HIVE_KAFKA_POLL_TIMEOUT);
+          KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName());
   private static final String
       ERROR_POLL_TIMEOUT_FORMAT =
       "Consumer returned [0] record due to exhausted poll timeout [%s]ms from TopicPartition:[%s] "
@@ -54,30 +61,38 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
   private final long endOffset;
   private final long startOffset;
   private final long pollTimeoutMs;
+  private final Duration pollTimeoutDurationMs;
   private final Stopwatch stopwatch = Stopwatch.createUnstarted();
   private ConsumerRecords<byte[], byte[]> records;
+  /**
+   * Holds the kafka consumer position after the last poll() call.
+   */
   private long consumerPosition;
   private ConsumerRecord<byte[], byte[]> nextRecord;
   private boolean hasMore = true;
+  /**
+   * On each Kafka Consumer poll() call we get a batch of records, this Iterator will be used to loop over it.
+   */
   private Iterator<ConsumerRecord<byte[], byte[]>> consumerRecordIterator = null;
 
   /**
-   * Iterator over Kafka Records over a single {@code topicPartition} inclusive {@code startOffset},
-   * up to exclusive {@code endOffset}.
-   * <p>
-   * If {@code requestedStartOffset} is not null will seek up to that offset
-   * Else If {@code requestedStartOffset} is null will seek to beginning see
-   * {@link org.apache.kafka.clients.consumer.Consumer#seekToBeginning(java.util.Collection)}
-   * <p>
-   * When provided with {@code requestedEndOffset}, will return records up to consumer position == endOffset
-   * Else If {@code requestedEndOffset} is null it will read up to the current end of the stream
-   * {@link org.apache.kafka.clients.consumer.Consumer#seekToEnd(java.util.Collection)}
-   *  <p>
-   * @param consumer       functional kafka consumer.
-   * @param topicPartition kafka topic partition.
-   * @param requestedStartOffset    requested start position.
-   * @param requestedEndOffset      requested end position. If null will read up to current last
-   * @param pollTimeoutMs  poll time out in ms.
+   * Kafka record Iterator pulling from a single {@code topicPartition} an inclusive {@code requestedStartOffset},
+   * up to exclusive {@code requestedEndOffset}.
+   *
+   * This iterator can block on polling up to a designated timeout.
+   *
+   * If no record is returned by brokers after poll timeout duration such case will be considered as an exception.
+   * Although the timeout exception it is a retryable exception, therefore users of this class can retry if needed.
+   *
+   * @param consumer       Functional kafka consumer, user must initialize this and close it.
+   * @param topicPartition Target Kafka topic partition.
+   * @param requestedStartOffset    Requested start offset position, if NULL iterator will seek to beginning using:
+   *                                {@link Consumer#seekToBeginning(java.util.Collection)}.
+   *
+   * @param requestedEndOffset      Requested end position. If null will read up to last available offset,
+   *                                such position is given by:
+   *                                {@link Consumer#seekToEnd(java.util.Collection)}.
+   * @param pollTimeoutMs  positive number indicating poll time out in ms.
    */
   KafkaRecordIterator(Consumer<byte[], byte[]> consumer,
       TopicPartition topicPartition,
@@ -87,6 +102,7 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
     this.consumer = Preconditions.checkNotNull(consumer, "Consumer can not be null");
     this.topicPartition = Preconditions.checkNotNull(topicPartition, "Topic partition can not be null");
     this.pollTimeoutMs = pollTimeoutMs;
+    this.pollTimeoutDurationMs = Duration.ofMillis(pollTimeoutMs);
     Preconditions.checkState(this.pollTimeoutMs > 0, "Poll timeout has to be positive number");
     final List<TopicPartition> topicPartitionList = Collections.singletonList(topicPartition);
     // assign topic partition to consumer
@@ -138,7 +154,12 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
   }
 
   /**
-   * @throws IllegalStateException if the kafka consumer poll call can not reach the target offset.
+   * Check if there is more records to be consumed and pull more from the broker if current batch of record is empty.
+   * This method might block up to {@link this#pollTimeoutMs} to pull records from Kafka Broker.
+   *
+   * @throws PollTimeoutException if poll returns 0 record and consumer position did not reach requested endOffset.
+   * Such an exception is a retryable exception, and it can be a transient exception that if retried may succeed.
+   *
    * @return true if has more records to be consumed.
    */
   @Override public boolean hasNext() {
@@ -158,20 +179,20 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
   /**
    * Poll more records from the Kafka Broker.
    *
-   * @throws IllegalStateException if no records returned before consumer position reaches target end offset.
+   * @throws PollTimeoutException if poll returns 0 record  and consumer's position < requested endOffset.
    */
   private void pollRecords() {
     if (LOG.isTraceEnabled()) {
       stopwatch.reset().start();
     }
-    records = consumer.poll(pollTimeoutMs);
+    records = consumer.poll(pollTimeoutDurationMs);
     if (LOG.isTraceEnabled()) {
       stopwatch.stop();
       LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
     }
     // Fail if we can not poll within one lap of pollTimeoutMs.
     if (records.isEmpty() && consumer.position(topicPartition) < endOffset) {
-      throw new IllegalStateException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
+      throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
           pollTimeoutMs,
           topicPartition.toString(),
           startOffset,
@@ -201,4 +222,12 @@ public class KafkaRecordIterator implements Iterator<ConsumerRecord<byte[], byte
       nextRecord = null;
     }
   }
+
+  static final class PollTimeoutException extends RetriableException {
+    private static final long serialVersionUID = 1L;
+
+    PollTimeoutException(String message) {
+      super(message);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
new file mode 100644
index 0000000..746de61
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+/**
+ * Kafka Records Reader implementation.
+ */
+@SuppressWarnings("WeakerAccess") public class KafkaRecordReader extends RecordReader<NullWritable, KafkaWritable>
+    implements org.apache.hadoop.mapred.RecordReader<NullWritable, KafkaWritable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
+
+  private KafkaConsumer<byte[], byte[]> consumer = null;
+  private Configuration config = null;
+  private KafkaWritable currentWritableValue;
+  private Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor = null;
+
+  private long totalNumberRecords = 0L;
+  private long consumedRecords = 0L;
+  private long readBytes = 0L;
+  private volatile boolean started = false;
+  private long startOffset = -1L;
+  private long endOffset = Long.MAX_VALUE;
+
+  @SuppressWarnings("WeakerAccess") public KafkaRecordReader() {
+  }
+
+  private void initConsumer() {
+    if (consumer == null) {
+      LOG.info("Initializing Kafka Consumer");
+      final Properties properties = KafkaUtils.consumerProperties(config);
+      String brokerString = properties.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+      Preconditions.checkNotNull(brokerString, "broker end point can not be null");
+      LOG.info("Starting Consumer with Kafka broker string [{}]", brokerString);
+      consumer = new KafkaConsumer<>(properties);
+    }
+  }
+
+  @SuppressWarnings("WeakerAccess") public KafkaRecordReader(KafkaInputSplit inputSplit,
+      Configuration jobConf) {
+    initialize(inputSplit, jobConf);
+  }
+
+  private synchronized void initialize(KafkaInputSplit inputSplit, Configuration jobConf) {
+    if (!started) {
+      this.config = jobConf;
+      startOffset = inputSplit.getStartOffset();
+      endOffset = inputSplit.getEndOffset();
+      TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
+      Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
+          "Start [%s] has to be positive and less or equal than End [%s]",
+          startOffset,
+          endOffset);
+      totalNumberRecords += endOffset - startOffset;
+      initConsumer();
+      long
+          pollTimeout =
+          config.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1);
+      LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
+      this.recordsCursor =
+          startOffset == endOffset ?
+              new EmptyIterator() :
+              new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout);
+      started = true;
+    }
+  }
+
+  @Override public void initialize(org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext context) {
+    initialize((KafkaInputSplit) inputSplit, context.getConfiguration());
+  }
+
+  @Override public boolean next(NullWritable nullWritable, KafkaWritable bytesWritable) {
+    if (started && recordsCursor.hasNext()) {
+      ConsumerRecord<byte[], byte[]> record = recordsCursor.next();
+      bytesWritable.set(record, startOffset, endOffset);
+      consumedRecords += 1;
+      readBytes += record.serializedValueSize();
+      return true;
+    }
+    return false;
+  }
+
+  @Override public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override public KafkaWritable createValue() {
+    return new KafkaWritable();
+  }
+
+  @Override public long getPos() {
+    return -1;
+  }
+
+  @Override public boolean nextKeyValue() {
+    currentWritableValue = new KafkaWritable();
+    if (next(NullWritable.get(), currentWritableValue)) {
+      return true;
+    }
+    currentWritableValue = null;
+    return false;
+  }
+
+  @Override public NullWritable getCurrentKey() {
+    return NullWritable.get();
+  }
+
+  @Override public KafkaWritable getCurrentValue() {
+    return Preconditions.checkNotNull(currentWritableValue);
+  }
+
+  @Override public float getProgress() {
+    if (consumedRecords == 0) {
+      return 0f;
+    }
+    if (consumedRecords >= totalNumberRecords) {
+      return 1f;
+    }
+    return consumedRecords * 1.0f / totalNumberRecords;
+  }
+
+  @Override public void close() {
+    LOG.trace("total read bytes [{}]", readBytes);
+    if (consumer != null) {
+      consumer.wakeup();
+      consumer.close();
+    }
+  }
+
+  /**
+   * Empty iterator for empty splits when startOffset == endOffset, this is added to avoid clumsy if condition.
+   */
+  private static final class EmptyIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+    @Override public boolean hasNext() {
+      return false;
+    }
+
+    @Override public ConsumerRecord<byte[], byte[]> next() {
+      throw new IllegalStateException("this is an empty iterator");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
deleted file mode 100644
index 1b00f85..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordWritable.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import javax.annotation.Nullable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
-
-/**
- * Writable implementation of Kafka ConsumerRecord.
- * Serialized in the form
- * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) |
- * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) |
- * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])}
- */
-public class KafkaRecordWritable implements Writable {
-
-  /**
-   * Kafka partition id
-   */
-  private int partition;
-  /**
-   * Record Offset
-   */
-  private long offset;
-  /**
-   * Fist offset given by the input split used to pull the event {@link KafkaPullerInputSplit#getStartOffset()}
-   */
-  private long startOffset;
-  /**
-   * Last Offset given by the input split used to pull the event {@link KafkaPullerInputSplit#getEndOffset()}
-   */
-  private long endOffset;
-  /**
-   * Event timestamp provided by Kafka Record {@link ConsumerRecord#timestamp()}
-   */
-  private long timestamp;
-  /**
-   * Record value
-   */
-  private byte[] value;
-
-  /**
-   * Record key content or null
-   */
-  private byte[] recordKey;
-
-
-  void set(ConsumerRecord<byte[], byte[]> consumerRecord, long startOffset, long endOffset) {
-    this.partition = consumerRecord.partition();
-    this.timestamp = consumerRecord.timestamp();
-    this.offset = consumerRecord.offset();
-    this.value = consumerRecord.value();
-    this.recordKey = consumerRecord.key();
-    this.startOffset = startOffset;
-    this.endOffset = endOffset;
-  }
-
-   KafkaRecordWritable(int partition,
-       long offset,
-       long timestamp,
-       byte[] value,
-       long startOffset,
-       long endOffset,
-       @Nullable byte[] recordKey) {
-    this.partition = partition;
-    this.offset = offset;
-    this.timestamp = timestamp;
-    this.value = value;
-    this.recordKey = recordKey;
-    this.startOffset = startOffset;
-    this.endOffset = endOffset;
-  }
-
-  @SuppressWarnings("WeakerAccess") public KafkaRecordWritable() {
-  }
-
-  @Override public void write(DataOutput dataOutput) throws IOException {
-    dataOutput.writeLong(timestamp);
-    dataOutput.writeInt(partition);
-    dataOutput.writeLong(offset);
-    dataOutput.writeLong(startOffset);
-    dataOutput.writeLong(endOffset);
-    dataOutput.writeInt(value.length);
-    dataOutput.write(value);
-    if (recordKey != null) {
-      dataOutput.writeInt(recordKey.length);
-      dataOutput.write(recordKey);
-    } else {
-      dataOutput.writeInt(-1);
-    }
-  }
-
-  @Override public void readFields(DataInput dataInput) throws IOException {
-    timestamp = dataInput.readLong();
-    partition = dataInput.readInt();
-    offset = dataInput.readLong();
-    startOffset = dataInput.readLong();
-    endOffset = dataInput.readLong();
-    int dataSize = dataInput.readInt();
-    if (dataSize > 0) {
-      value = new byte[dataSize];
-      dataInput.readFully(value);
-    } else {
-      value = new byte[0];
-    }
-    int keyArraySize = dataInput.readInt();
-    if (keyArraySize > -1) {
-      recordKey = new byte[keyArraySize];
-      dataInput.readFully(recordKey);
-    } else {
-      recordKey = null;
-    }
-  }
-
-  int getPartition() {
-    return partition;
-  }
-
-  long getOffset() {
-    return offset;
-  }
-
-  long getTimestamp() {
-    return timestamp;
-  }
-
-  byte[] getValue() {
-    return value;
-  }
-
-  long getStartOffset() {
-    return startOffset;
-  }
-
-  long getEndOffset() {
-    return endOffset;
-  }
-
-  @Nullable
-  byte[] getRecordKey() {
-    return recordKey;
-  }
-
-  @Override public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof KafkaRecordWritable)) {
-      return false;
-    }
-    KafkaRecordWritable writable = (KafkaRecordWritable) o;
-    return partition == writable.partition
-        && offset == writable.offset
-        && startOffset == writable.startOffset
-        && endOffset == writable.endOffset
-        && timestamp == writable.timestamp
-        && Arrays.equals(value, writable.value)
-        && Arrays.equals(recordKey, writable.recordKey);
-  }
-
-  @Override public int hashCode() {
-    int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp);
-    result = 31 * result + Arrays.hashCode(value);
-    result = 31 * result + Arrays.hashCode(recordKey);
-    return result;
-  }
-
-  @Override public String toString() {
-    return "KafkaRecordWritable{"
-        + "partition="
-        + partition
-        + ", offset="
-        + offset
-        + ", startOffset="
-        + startOffset
-        + ", endOffset="
-        + endOffset
-        + ", timestamp="
-        + timestamp
-        + ", value="
-        + Arrays.toString(value)
-        + ", recordKey="
-        + Arrays.toString(recordKey)
-        + '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
index 8fbdfda..256796a 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaScanTrimmer.java
@@ -63,14 +63,14 @@ import java.util.function.Predicate;
  */
 class KafkaScanTrimmer {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaScanTrimmer.class);
-  private final Map<TopicPartition, KafkaPullerInputSplit> fullHouse;
+  private final Map<TopicPartition, KafkaInputSplit> fullHouse;
   private final KafkaConsumer kafkaConsumer;
 
   /**
    * @param fullHouse     initial full scan to be pruned, this is a map of Topic partition to input split.
    * @param kafkaConsumer kafka consumer used to pull offsets for time filter if needed
    */
-  KafkaScanTrimmer(Map<TopicPartition, KafkaPullerInputSplit> fullHouse, KafkaConsumer kafkaConsumer) {
+  KafkaScanTrimmer(Map<TopicPartition, KafkaInputSplit> fullHouse, KafkaConsumer kafkaConsumer) {
     this.fullHouse = fullHouse;
     this.kafkaConsumer = kafkaConsumer;
   }
@@ -83,8 +83,8 @@ class KafkaScanTrimmer {
    *
    * @return tiny house of of the full house based on filter expression
    */
-  Map<TopicPartition, KafkaPullerInputSplit> computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
-    Map<TopicPartition, KafkaPullerInputSplit> optimizedScan = parseAndOptimize(filterExpression);
+  Map<TopicPartition, KafkaInputSplit> computeOptimizedScan(ExprNodeGenericFuncDesc filterExpression) {
+    Map<TopicPartition, KafkaInputSplit> optimizedScan = parseAndOptimize(filterExpression);
 
     if (LOG.isDebugEnabled()) {
       if (optimizedScan != null) {
@@ -113,7 +113,7 @@ class KafkaScanTrimmer {
    *
    * @return Map of optimized kafka range scans or null if it is impossible to optimize.
    */
-  @Nullable private Map<TopicPartition, KafkaPullerInputSplit> parseAndOptimize(ExprNodeDesc expression) {
+  @Nullable private Map<TopicPartition, KafkaInputSplit> parseAndOptimize(ExprNodeDesc expression) {
     if (expression.getClass() != ExprNodeGenericFuncDesc.class) {
       return null;
     }
@@ -154,7 +154,7 @@ class KafkaScanTrimmer {
    *
    * @return leaf scan or null if can not figure out push down
    */
-  @Nullable private Map<TopicPartition, KafkaPullerInputSplit> pushLeaf(ExprNodeGenericFuncDesc expr,
+  @Nullable private Map<TopicPartition, KafkaInputSplit> pushLeaf(ExprNodeGenericFuncDesc expr,
       PredicateLeaf.Operator operator,
       boolean negation) {
     if (expr.getChildren().size() != 2) {
@@ -192,8 +192,7 @@ class KafkaScanTrimmer {
       constantDesc = (ExprNodeConstantDesc) extracted[0];
     }
 
-
-    if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.PARTITION.getName())) {
+    if (columnDesc.getColumn().equals(MetadataColumn.PARTITION.getName())) {
       return buildScanFromPartitionPredicate(fullHouse,
           operator,
           ((Number) constantDesc.getValue()).intValue(),
@@ -201,7 +200,7 @@ class KafkaScanTrimmer {
           negation);
 
     }
-    if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.OFFSET.getName())) {
+    if (columnDesc.getColumn().equals(MetadataColumn.OFFSET.getName())) {
       return buildScanFromOffsetPredicate(fullHouse,
           operator,
           ((Number) constantDesc.getValue()).longValue(),
@@ -209,7 +208,7 @@ class KafkaScanTrimmer {
           negation);
     }
 
-    if (columnDesc.getColumn().equals(KafkaStreamingUtils.MetadataColumn.TIMESTAMP.getName())) {
+    if (columnDesc.getColumn().equals(MetadataColumn.TIMESTAMP.getName())) {
       long timestamp = ((Number) constantDesc.getValue()).longValue();
       //noinspection unchecked
       return buildScanForTimesPredicate(fullHouse, operator, timestamp, flip, negation, kafkaConsumer);
@@ -229,8 +228,8 @@ class KafkaScanTrimmer {
    * @return filtered kafka scan
    */
 
-  @VisibleForTesting static Map<TopicPartition, KafkaPullerInputSplit> buildScanFromPartitionPredicate(
-      Map<TopicPartition, KafkaPullerInputSplit> fullScan,
+  @VisibleForTesting static Map<TopicPartition, KafkaInputSplit> buildScanFromPartitionPredicate(Map<TopicPartition,
+      KafkaInputSplit> fullScan,
       PredicateLeaf.Operator operator,
       int partitionConst,
       boolean flip,
@@ -262,12 +261,12 @@ class KafkaScanTrimmer {
       predicate = topicPartition -> true;
     }
 
-    ImmutableMap.Builder<TopicPartition, KafkaPullerInputSplit> builder = ImmutableMap.builder();
+    ImmutableMap.Builder<TopicPartition, KafkaInputSplit> builder = ImmutableMap.builder();
     // Filter full scan based on predicate
     fullScan.entrySet()
         .stream()
         .filter(entry -> predicate.test(entry.getKey()))
-        .forEach(entry -> builder.put(entry.getKey(), entry.getValue().clone()));
+        .forEach(entry -> builder.put(entry.getKey(), KafkaInputSplit.copyOf(entry.getValue())));
     return builder.build();
   }
 
@@ -280,8 +279,8 @@ class KafkaScanTrimmer {
    *
    * @return optimized kafka scan
    */
-  @VisibleForTesting static Map<TopicPartition, KafkaPullerInputSplit> buildScanFromOffsetPredicate(Map<TopicPartition,
-      KafkaPullerInputSplit> fullScan,
+  @VisibleForTesting static Map<TopicPartition, KafkaInputSplit> buildScanFromOffsetPredicate(Map<TopicPartition,
+      KafkaInputSplit> fullScan,
       PredicateLeaf.Operator operator,
       long offsetConst,
       boolean flip,
@@ -320,54 +319,50 @@ class KafkaScanTrimmer {
       endOffset = -1;
     }
 
-    final Map<TopicPartition, KafkaPullerInputSplit> newScan = new HashMap<>();
+    final Map<TopicPartition, KafkaInputSplit> newScan = new HashMap<>();
 
     fullScan.forEach((tp, existingInputSplit) -> {
-      final KafkaPullerInputSplit newInputSplit;
+      final KafkaInputSplit newInputSplit;
       if (startOffset != -1 && endOffset == -1) {
-        newInputSplit = new KafkaPullerInputSplit(tp.topic(),
+        newInputSplit = new KafkaInputSplit(tp.topic(),
             tp.partition(),
             //if the user ask for start offset > max offset will replace with last offset
             Math.min(startOffset, existingInputSplit.getEndOffset()),
             existingInputSplit.getEndOffset(),
             existingInputSplit.getPath());
       } else if (endOffset != -1 && startOffset == -1) {
-        newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
+        newInputSplit = new KafkaInputSplit(tp.topic(), tp.partition(), existingInputSplit.getStartOffset(),
             //@TODO check this, if user ask for non existing end offset ignore it and position head on start
             // This can be an issue when doing ingestion from kafka into Hive, what happen if there is some gaps
             // Shall we fail the ingest or carry-on and ignore non existing offsets
             Math.max(endOffset, existingInputSplit.getStartOffset()), existingInputSplit.getPath());
       } else if (endOffset == startOffset + 1) {
         if (startOffset < existingInputSplit.getStartOffset() || startOffset >= existingInputSplit.getEndOffset()) {
-          newInputSplit = new KafkaPullerInputSplit(tp.topic(), tp.partition(),
+          newInputSplit = new KafkaInputSplit(tp.topic(), tp.partition(),
               // non existing offset will be seeking last offset
               existingInputSplit.getEndOffset(), existingInputSplit.getEndOffset(), existingInputSplit.getPath());
         } else {
           newInputSplit =
-              new KafkaPullerInputSplit(tp.topic(),
-                  tp.partition(),
-                  startOffset,
-                  endOffset,
-                  existingInputSplit.getPath());
+              new KafkaInputSplit(tp.topic(), tp.partition(), startOffset, endOffset, existingInputSplit.getPath());
         }
 
       } else {
         newInputSplit =
-            new KafkaPullerInputSplit(tp.topic(),
+            new KafkaInputSplit(tp.topic(),
                 tp.partition(),
                 existingInputSplit.getStartOffset(),
                 existingInputSplit.getEndOffset(),
                 existingInputSplit.getPath());
       }
 
-      newScan.put(tp, KafkaPullerInputSplit.intersectRange(newInputSplit, existingInputSplit));
+      newScan.put(tp, KafkaInputSplit.intersectRange(newInputSplit, existingInputSplit));
     });
 
     return newScan;
   }
 
-  @Nullable private static Map<TopicPartition, KafkaPullerInputSplit> buildScanForTimesPredicate(
-      Map<TopicPartition, KafkaPullerInputSplit> fullHouse,
+  @Nullable private static Map<TopicPartition, KafkaInputSplit> buildScanForTimesPredicate(
+      Map<TopicPartition, KafkaInputSplit> fullHouse,
       PredicateLeaf.Operator operator,
       long timestamp,
       boolean flip,
@@ -385,11 +380,11 @@ class KafkaScanTrimmer {
         // NULL will be returned for that partition If the message format version in a partition is before 0.10.0
         Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(timePartitionsMap);
         return Maps.toMap(fullHouse.keySet(), tp -> {
-          KafkaPullerInputSplit existing = fullHouse.get(tp);
+          KafkaInputSplit existing = fullHouse.get(tp);
           OffsetAndTimestamp foundOffsetAndTime = offsetAndTimestamp.get(tp);
           //Null in case filter doesn't match or field not existing ie old broker thus return empty scan.
           final long startOffset = foundOffsetAndTime == null ? existing.getEndOffset() : foundOffsetAndTime.offset();
-          return new KafkaPullerInputSplit(Objects.requireNonNull(tp).topic(),
+          return new KafkaInputSplit(Objects.requireNonNull(tp).topic(),
               tp.partition(),
               startOffset,
               existing.getEndOffset(),
@@ -410,21 +405,21 @@ class KafkaScanTrimmer {
    *
    * @return either full scan or an optimized sub scan.
    */
-  private Map<TopicPartition, KafkaPullerInputSplit> pushAndOp(ExprNodeGenericFuncDesc expr) {
-    Map<TopicPartition, KafkaPullerInputSplit> currentScan = new HashMap<>();
+  private Map<TopicPartition, KafkaInputSplit> pushAndOp(ExprNodeGenericFuncDesc expr) {
+    Map<TopicPartition, KafkaInputSplit> currentScan = new HashMap<>();
 
-    fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaPullerInputSplit.copyOf(input)));
+    fullHouse.forEach((tp, input) -> currentScan.put(tp, KafkaInputSplit.copyOf(input)));
 
     for (ExprNodeDesc child : expr.getChildren()) {
-      Map<TopicPartition, KafkaPullerInputSplit> scan = parseAndOptimize(child);
+      Map<TopicPartition, KafkaInputSplit> scan = parseAndOptimize(child);
       if (scan != null) {
         Set<TopicPartition> currentKeys = ImmutableSet.copyOf(currentScan.keySet());
         currentKeys.forEach(key -> {
-          KafkaPullerInputSplit newSplit = scan.get(key);
-          KafkaPullerInputSplit oldSplit = currentScan.get(key);
+          KafkaInputSplit newSplit = scan.get(key);
+          KafkaInputSplit oldSplit = currentScan.get(key);
           currentScan.remove(key);
           if (newSplit != null) {
-            KafkaPullerInputSplit intersectionSplit = KafkaPullerInputSplit.intersectRange(newSplit, oldSplit);
+            KafkaInputSplit intersectionSplit = KafkaInputSplit.intersectRange(newSplit, oldSplit);
             if (intersectionSplit != null) {
               currentScan.put(key, intersectionSplit);
             }
@@ -436,18 +431,18 @@ class KafkaScanTrimmer {
     return currentScan;
   }
 
-  @Nullable private Map<TopicPartition, KafkaPullerInputSplit> pushOrOp(ExprNodeGenericFuncDesc expr) {
-    final Map<TopicPartition, KafkaPullerInputSplit> currentScan = new HashMap<>();
+  @Nullable private Map<TopicPartition, KafkaInputSplit> pushOrOp(ExprNodeGenericFuncDesc expr) {
+    final Map<TopicPartition, KafkaInputSplit> currentScan = new HashMap<>();
     for (ExprNodeDesc child : expr.getChildren()) {
-      Map<TopicPartition, KafkaPullerInputSplit> scan = parseAndOptimize(child);
+      Map<TopicPartition, KafkaInputSplit> scan = parseAndOptimize(child);
       if (scan == null) {
         // if any of the children is unknown bailout
         return null;
       }
 
       scan.forEach((tp, input) -> {
-        KafkaPullerInputSplit existingSplit = currentScan.get(tp);
-        currentScan.put(tp, KafkaPullerInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
+        KafkaInputSplit existingSplit = currentScan.get(tp);
+        currentScan.put(tp, KafkaInputSplit.unionRange(input, existingSplit == null ? input : existingSplit));
       });
     }
     return currentScan;

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
new file mode 100644
index 0000000..51cfa24
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.rmi.server.UID;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Generic Kafka Serde that allow user to delegate Serde to other class like Avro,
+ * Json or any class that supports {@link BytesWritable}.
+ * I the user which to implement their own serde all they need is to implement a serde that extend
+ * {@link org.apache.hadoop.hive.serde2.AbstractSerDe} and accept {@link BytesWritable} as value
+ */
+@SerDeSpec(schemaProps = { serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES }) public class KafkaSerDe
+    extends AbstractSerDe {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaSerDe.class);
+
+  /**
+   * Delegate SerDe used to Serialize and DeSerialize data form/to Kafka.
+   */
+  private AbstractSerDe delegateSerDe;
+
+  /**
+   * Delegate Object Inspector used to Deserialize the row, this OI is constructed by the {@code delegateSerDe}.
+   */
+  private StructObjectInspector delegateDeserializerOI;
+
+  /**
+   * Delegate Object Inspector used to Serialize the row as byte array.
+   */
+  private StructObjectInspector delegateSerializerOI;
+
+  /**
+   * Object Inspector of original row plus metadata.
+   */
+  private ObjectInspector objectInspector;
+  private final List<String> columnNames = Lists.newArrayList();
+  private BytesConverter bytesConverter;
+
+  @Override public void initialize(@Nullable Configuration conf, Properties tbl) throws SerDeException {
+    //This method is called before {@link org.apache.hadoop.hive.kafka.KafkaStorageHandler.preCreateTable}
+    //Thus we need to default to org.apache.hadoop.hive.kafka.KafkaUtils.DEFAULT_PROPERTIES if any property is needed
+    final String
+        className =
+        tbl.getProperty(KafkaTableProperties.SERDE_CLASS_NAME.getName(),
+            KafkaTableProperties.SERDE_CLASS_NAME.getDefaultValue());
+    delegateSerDe = KafkaUtils.createDelegate(className);
+    //noinspection deprecation
+    delegateSerDe.initialize(conf, tbl);
+
+    if (!(delegateSerDe.getObjectInspector() instanceof StructObjectInspector)) {
+      throw new SerDeException("Was expecting Struct Object Inspector but have " + delegateSerDe.getObjectInspector()
+          .getClass()
+          .getName());
+    }
+    delegateDeserializerOI = (StructObjectInspector) delegateSerDe.getObjectInspector();
+
+    // Build column names Order matters here
+    columnNames.addAll(delegateDeserializerOI.getAllStructFieldRefs()
+        .stream()
+        .map(StructField::getFieldName)
+        .collect(Collectors.toList()));
+    columnNames.addAll(MetadataColumn.KAFKA_METADATA_COLUMN_NAMES);
+
+    final List<ObjectInspector> inspectors = new ArrayList<>(columnNames.size());
+    inspectors.addAll(delegateDeserializerOI.getAllStructFieldRefs()
+        .stream()
+        .map(StructField::getFieldObjectInspector)
+        .collect(Collectors.toList()));
+    inspectors.addAll(MetadataColumn.KAFKA_METADATA_INSPECTORS);
+    objectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors);
+
+    // Setup Read and Write Path From/To Kafka
+    if (delegateSerDe.getSerializedClass() == Text.class) {
+      bytesConverter = new TextBytesConverter();
+    } else if (delegateSerDe.getSerializedClass() == AvroGenericRecordWritable.class) {
+      String schemaFromProperty = tbl.getProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), "");
+      Preconditions.checkArgument(!schemaFromProperty.isEmpty(), "Avro Schema is empty Can not go further");
+      Schema schema = AvroSerdeUtils.getSchemaFor(schemaFromProperty);
+      LOG.debug("Building Avro Reader with schema {}", schemaFromProperty);
+      bytesConverter = new AvroBytesConverter(schema);
+    } else {
+      bytesConverter = new BytesWritableConverter();
+    }
+  }
+
+  @Override public Class<? extends Writable> getSerializedClass() {
+    return delegateSerDe.getSerializedClass();
+  }
+
+  @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+
+    if (!(objInspector instanceof StructObjectInspector)) {
+      throw new SerDeException("Object inspector has to be "
+          + StructObjectInspector.class.getName()
+          + " but got "
+          + objInspector.getClass().getName());
+    }
+    StructObjectInspector structObjectInspector = (StructObjectInspector) objInspector;
+    List<Object> data = structObjectInspector.getStructFieldsDataAsList(obj);
+    if (delegateSerializerOI == null) {
+      //@TODO check if i can cache this if it is the same.
+      delegateSerializerOI =
+          new SubStructObjectInspector(structObjectInspector, data.size() - MetadataColumn.values().length);
+    }
+    // We always append the metadata columns to the end of the row.
+    final List<Object> row = data.subList(0, data.size() - MetadataColumn.values().length);
+    //@TODO @FIXME use column names instead of actual positions that can be hard to read and review
+    Object key = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size());
+    Object partition = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 1);
+    Object offset = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 2);
+    Object timestamp = data.get(data.size() - MetadataColumn.KAFKA_METADATA_COLUMN_NAMES.size() + 3);
+
+    if (PrimitiveObjectInspectorUtils.getLong(offset, MetadataColumn.OFFSET.getObjectInspector()) != -1) {
+      LOG.error("Can not insert values into `__offset` column, has to be [-1]");
+      throw new SerDeException("Can not insert values into `__offset` column, has to be [-1]");
+    }
+
+    final byte[]
+        keyBytes =
+        key == null ? null : PrimitiveObjectInspectorFactory.writableBinaryObjectInspector.getPrimitiveJavaObject(key);
+    final long
+        recordTs =
+        timestamp == null ?
+            -1 :
+            PrimitiveObjectInspectorUtils.getLong(timestamp, MetadataColumn.TIMESTAMP.getObjectInspector());
+    final int
+        recordPartition =
+        partition == null ?
+            -1 :
+            PrimitiveObjectInspectorUtils.getInt(partition, MetadataColumn.PARTITION.getObjectInspector());
+
+    //noinspection unchecked
+    return new KafkaWritable(recordPartition,
+        recordTs,
+        bytesConverter.getBytes(delegateSerDe.serialize(row, delegateSerializerOI)),
+        keyBytes);
+  }
+
+  @Override public SerDeStats getSerDeStats() {
+    return delegateSerDe.getSerDeStats();
+  }
+
+  @Override public Object deserialize(Writable blob) throws SerDeException {
+    KafkaWritable record = (KafkaWritable) blob;
+    final Object row = delegateSerDe.deserialize(bytesConverter.getWritable(record.getValue()));
+    return columnNames.stream().map(name -> {
+      final MetadataColumn metadataColumn = MetadataColumn.forName(name);
+      if (metadataColumn != null) {
+        return record.getHiveWritable(metadataColumn);
+      }
+      return delegateDeserializerOI.getStructFieldData(row, delegateDeserializerOI.getStructFieldRef(name));
+    }).collect(Collectors.toList());
+  }
+
+  @Override public ObjectInspector getObjectInspector() {
+    return objectInspector;
+  }
+
+  /**
+   * Returns a view of input object inspector list between:
+   * <tt>0</tt> inclusive and the specified <tt>toIndex</tt>, exclusive.
+   */
+  private static final class SubStructObjectInspector extends StructObjectInspector {
+
+    private final StructObjectInspector baseOI;
+    private final List<? extends StructField> structFields;
+
+    /**
+     * Returns a live view of the base Object inspector starting form 0 to {@code toIndex} exclusive.
+     * @param baseOI base Object Inspector.
+     * @param toIndex toIndex.
+     */
+    private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) {
+      this.baseOI = baseOI;
+      structFields = baseOI.getAllStructFieldRefs().subList(0, toIndex);
+    }
+
+    /**
+     * Returns all the fields.
+     */
+    @Override public List<? extends StructField> getAllStructFieldRefs() {
+      return structFields;
+    }
+
+    /**
+     * Look up a field.
+     * @param fieldName fieldName to be looked up.
+     */
+    @SuppressWarnings("OptionalGetWithoutIsPresent") @Override public StructField getStructFieldRef(String fieldName) {
+      return this.getAllStructFieldRefs()
+          .stream()
+          .filter(ref -> ref.getFieldName().equals(fieldName))
+          .findFirst()
+          .get();
+    }
+
+    /**
+     * returns null for data = null.
+     * @param data input.
+     * @param fieldRef field to extract.
+     */
+    @Override public Object getStructFieldData(Object data, StructField fieldRef) {
+      return baseOI.getStructFieldData(data, fieldRef);
+    }
+
+    /**
+     * returns null for data = null.
+     * @param data input data.
+     */
+    @Override public List<Object> getStructFieldsDataAsList(Object data) {
+      if (data == null) {
+        return null;
+      }
+      int size = getAllStructFieldRefs().size();
+      List<Object> res = new ArrayList<>(size);
+      for (int i = 0; i < size; i++) {
+        res.add(baseOI.getStructFieldData(data, getAllStructFieldRefs().get(i)));
+      }
+      return res;
+    }
+
+    /**
+     * Returns the name of the data type that is inspected by this
+     * ObjectInspector. This is used to display the type information to the user.
+     *
+     * For primitive types, the type name is standardized. For other types, the
+     * type name can be something like "list&lt;int&gt;", "map&lt;int,string&gt;", java class
+     * names, or user-defined type names similar to typedef.
+     */
+    @Override public String getTypeName() {
+      return baseOI.getTypeName();
+    }
+
+    /**
+     * An ObjectInspector must inherit from one of the following interfaces if
+     * getCategory() returns: PRIMITIVE: PrimitiveObjectInspector LIST:
+     * ListObjectInspector MAP: MapObjectInspector STRUCT: StructObjectInspector.
+     */
+    @Override public Category getCategory() {
+      return baseOI.getCategory();
+    }
+  }
+
+  /**
+   * Class that encapsulate the logic of serialize and deserialize bytes array to/from the delegate writable format.
+   * @param <K> delegate writable class.
+   */
+  private interface BytesConverter<K extends Writable> {
+    byte[] getBytes(K writable);
+
+    K getWritable(byte[] value);
+  }
+
+  private static class AvroBytesConverter implements BytesConverter<AvroGenericRecordWritable> {
+    private final Schema schema;
+    private final DatumReader<GenericRecord> dataReader;
+    private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<>();
+    private final AvroGenericRecordWritable avroGenericRecordWritable = new AvroGenericRecordWritable();
+    private final UID uid = new UID();
+
+    AvroBytesConverter(Schema schema) {
+      this.schema = schema;
+      dataReader = new SpecificDatumReader<>(this.schema);
+    }
+
+    @Override public byte[] getBytes(AvroGenericRecordWritable writable) {
+      GenericRecord record = writable.getRecord();
+      byte[] valueBytes = null;
+      try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+        BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(out, null);
+        gdw.setSchema(record.getSchema());
+        gdw.write(record, be);
+        out.flush();
+        valueBytes = out.toByteArray();
+      } catch (IOException e) {
+        Throwables.propagate(new SerDeException(e));
+      }
+      return valueBytes;
+    }
+
+    @Override public AvroGenericRecordWritable getWritable(byte[] value) {
+      GenericRecord avroRecord = null;
+      try {
+        avroRecord = dataReader.read(null, DecoderFactory.get().binaryDecoder(value, null));
+      } catch (IOException e) {
+        Throwables.propagate(new SerDeException(e));
+      }
+
+      avroGenericRecordWritable.setRecord(avroRecord);
+      avroGenericRecordWritable.setRecordReaderID(uid);
+      avroGenericRecordWritable.setFileSchema(avroRecord.getSchema());
+      return avroGenericRecordWritable;
+    }
+  }
+
+  private static class BytesWritableConverter implements BytesConverter<BytesWritable> {
+    @Override public byte[] getBytes(BytesWritable writable) {
+      return writable.getBytes();
+    }
+
+    @Override public BytesWritable getWritable(byte[] value) {
+      return new BytesWritable(value);
+    }
+  }
+
+  private static class TextBytesConverter implements BytesConverter<Text> {
+    Text text = new Text();
+    @Override public byte[] getBytes(Text writable) {
+      //@TODO  There is no reason to decode then encode the string to bytes really
+      //@FIXME this issue with CTRL-CHAR ^0 added by Text at the end of string and Json serd does not like that.
+      try {
+        return writable.decode(writable.getBytes(), 0, writable.getLength()).getBytes(Charset.forName("UTF-8"));
+      } catch (CharacterCodingException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override public Text getWritable(byte[] value) {
+      text.set(value);
+      return text;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index 96222c9..0d64cd9 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -18,10 +18,21 @@
 
 package org.apache.hadoop.hive.kafka;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -31,22 +42,29 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 /**
- * Hive Kafka storage handler to allow user querying Stream of tuples from a Kafka queue.
+ * Hive Kafka storage handler to allow user to read and write from/to Kafka message bus.
  */
-public class KafkaStorageHandler implements HiveStorageHandler {
+@SuppressWarnings("ALL") public class KafkaStorageHandler extends DefaultHiveMetaHook implements HiveStorageHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaStorageHandler.class);
   private static final String KAFKA_STORAGE_HANDLER = "org.apache.hadoop.hive.kafka.KafkaStorageHandler";
@@ -54,19 +72,19 @@ public class KafkaStorageHandler implements HiveStorageHandler {
   private Configuration configuration;
 
   @Override public Class<? extends InputFormat> getInputFormatClass() {
-    return KafkaPullerInputFormat.class;
+    return KafkaInputFormat.class;
   }
 
   @Override public Class<? extends OutputFormat> getOutputFormatClass() {
-    return NullOutputFormat.class;
+    return KafkaOutputFormat.class;
   }
 
   @Override public Class<? extends AbstractSerDe> getSerDeClass() {
-    return GenericKafkaSerDe.class;
+    return KafkaSerDe.class;
   }
 
   @Override public HiveMetaHook getMetaHook() {
-    return null;
+    return this;
   }
 
   @Override public HiveAuthorizationProvider getAuthorizationProvider() {
@@ -74,43 +92,33 @@ public class KafkaStorageHandler implements HiveStorageHandler {
   }
 
   @Override public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-    String topic = tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, "");
+    configureCommonProperties(tableDesc, jobProperties);
+  }
+
+  private void configureCommonProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+    String topic = tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName(), "");
     if (topic.isEmpty()) {
       throw new IllegalArgumentException("Kafka topic missing set table property->"
-          + KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
+          + KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
     }
-    jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_TOPIC, topic);
-    String brokerString = tableDesc.getProperties().getProperty(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, "");
+    jobProperties.put(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName(), topic);
+    String
+        brokerString =
+        tableDesc.getProperties().getProperty(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(), "");
     if (brokerString.isEmpty()) {
       throw new IllegalArgumentException("Broker address missing set table property->"
-          + KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+          + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    }
+    jobProperties.put(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName(), brokerString);
+    Arrays.stream(KafkaTableProperties.values())
+        .filter(tableProperty -> !tableProperty.isMandatory())
+        .forEach(tableProperty -> jobProperties.put(tableProperty.getName(),
+            tableDesc.getProperties().getProperty(tableProperty.getName())));
+    // If the user ask for EOS then set the read to only committed.
+    if (jobProperties.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName())
+        .equals(KafkaOutputFormat.WriteSemantic.EXACTLY_ONCE.name())) {
+      jobProperties.put("kafka.consumer.isolation.level", "read_committed");
     }
-    jobProperties.put(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS, brokerString);
-
-    jobProperties.put(KafkaStreamingUtils.SERDE_CLASS_NAME,
-        tableDesc.getProperties().getProperty(KafkaStreamingUtils.SERDE_CLASS_NAME, KafkaJsonSerDe.class.getName()));
-    LOG.debug("Table properties: SerDe class name {}", jobProperties.get(KafkaStreamingUtils.SERDE_CLASS_NAME));
-
-    //set extra properties
-    tableDesc.getProperties()
-        .entrySet()
-        .stream()
-        .filter(objectObjectEntry -> objectObjectEntry.getKey()
-            .toString()
-            .toLowerCase()
-            .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX))
-        .forEach(entry -> {
-          String
-              key =
-              entry.getKey().toString().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1);
-          if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) {
-            throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
-          }
-          String value = entry.getValue().toString();
-          jobProperties.put(key, value);
-          LOG.info("Setting extra job properties: key [{}] -> value [{}]", key, value);
-
-        });
   }
 
   @Override public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> secrets) {
@@ -118,19 +126,21 @@ public class KafkaStorageHandler implements HiveStorageHandler {
   }
 
   @Override public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
-
+    configureCommonProperties(tableDesc, jobProperties);
   }
 
   @Override public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
     configureInputJobProperties(tableDesc, jobProperties);
+    configureOutputJobProperties(tableDesc, jobProperties);
   }
 
   @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
     Map<String, String> properties = new HashMap<>();
     configureInputJobProperties(tableDesc, properties);
+    configureOutputJobProperties(tableDesc, properties);
     properties.forEach(jobConf::set);
     try {
-      KafkaStreamingUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
+      KafkaUtils.copyDependencyJars(jobConf, KafkaStorageHandler.class);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -149,11 +159,11 @@ public class KafkaStorageHandler implements HiveStorageHandler {
   }
 
   @Override public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
-    String topic = table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_TOPIC);
+    String topic = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
     if (topic == null || topic.isEmpty()) {
       throw new MetaException("topic is null or empty");
     }
-    String brokers = table.getParameters().get(KafkaStreamingUtils.HIVE_KAFKA_BOOTSTRAP_SERVERS);
+    String brokers = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
     if (brokers == null || brokers.isEmpty()) {
       throw new MetaException("kafka brokers string is null or empty");
     }
@@ -161,19 +171,232 @@ public class KafkaStorageHandler implements HiveStorageHandler {
     properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
     properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
     properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, Utilities.getTaskId(getConf()));
     table.getParameters()
         .entrySet()
         .stream()
         .filter(objectObjectEntry -> objectObjectEntry.getKey()
             .toLowerCase()
-            .startsWith(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX))
+            .startsWith(KafkaUtils.CONSUMER_CONFIGURATION_PREFIX))
         .forEach(entry -> {
-          String key = entry.getKey().substring(KafkaStreamingUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1);
-          if (KafkaStreamingUtils.FORBIDDEN_PROPERTIES.contains(key)) {
+          String key = entry.getKey().substring(KafkaUtils.CONSUMER_CONFIGURATION_PREFIX.length() + 1);
+          if (KafkaUtils.FORBIDDEN_PROPERTIES.contains(key)) {
             throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
           }
           properties.put(key, entry.getValue());
         });
     return new KafkaStorageHandlerInfo(topic, properties);
   }
+
+  private Properties buildProducerProperties(Table table) {
+    String brokers = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    if (brokers == null || brokers.isEmpty()) {
+      throw new RuntimeException("kafka brokers string is null or empty");
+    }
+    final Properties properties = new Properties();
+    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    table.getParameters()
+        .entrySet()
+        .stream()
+        .filter(objectObjectEntry -> objectObjectEntry.getKey()
+            .toLowerCase()
+            .startsWith(KafkaUtils.PRODUCER_CONFIGURATION_PREFIX))
+        .forEach(entry -> {
+          String key = entry.getKey().substring(KafkaUtils.PRODUCER_CONFIGURATION_PREFIX.length() + 1);
+          if (KafkaUtils.FORBIDDEN_PROPERTIES.contains(key)) {
+            throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
+          }
+          properties.put(key, entry.getValue());
+        });
+    return properties;
+  }
+
+  @Override public LockType getLockType(WriteEntity writeEntity) {
+    if (writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT)) {
+      return LockType.SHARED_READ;
+    }
+    return LockType.SHARED_WRITE;
+  }
+
+  private String getQueryId() {
+    return HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID);
+  }
+
+  @Override public void commitInsertTable(Table table, boolean overwrite) throws MetaException {
+    boolean
+        isExactlyOnce =
+        table.getParameters()
+            .get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName())
+            .equals(KafkaOutputFormat.WriteSemantic.EXACTLY_ONCE.name());
+    String optimiticCommitVal = table.getParameters().get(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName());
+    boolean isTwoPhaseCommit = !Boolean.parseBoolean(optimiticCommitVal);
+    if (!isExactlyOnce || !isTwoPhaseCommit) {
+      //Case it is not 2 phase commit no open transaction to handel.
+      return;
+    }
+
+    final Path queryWorkingDir = getQueryWorkingDir(table);
+    final Map<String, Pair<Long, Short>> transactionsMap;
+    final int maxTries = Integer.parseInt(table.getParameters().get(KafkaTableProperties.MAX_RETRIES.getName()));
+    // We have 4 Stages ahead of us:
+    // 1 Fetch Transactions state from HDFS.
+    // 2 Build/inti all the Kafka producers and perform a pre commit call to check if we can go ahead with commit.
+    // 3 Commit Transactions one by one.
+    // 4 Clean workingDirectory.
+
+    //First stage fetch the Transactions states
+    final RetryUtils.Task<Map<String, Pair<Long, Short>>>
+        fetchTransactionStates =
+        new RetryUtils.Task<Map<String, Pair<Long, Short>>>() {
+          @Override public Map<String, Pair<Long, Short>> perform() throws Exception {
+            return TransactionalKafkaWriter.getTransactionsState(FileSystem.get(getConf()), queryWorkingDir);
+          }
+        };
+
+    try {
+      transactionsMap = RetryUtils.retry(fetchTransactionStates, (error) -> (error instanceof IOException), maxTries);
+    } catch (Exception e) {
+      // Can not go further
+      LOG.error("Can not fetch Transaction states due [{}]", e.getMessage());
+      throw new MetaException(e.getMessage());
+    }
+
+    //Second Stage Resume Producers and Pre commit
+    final Properties baseProducerPros = buildProducerProperties(table);
+    final Map<String, HiveKafkaProducer> producersMap = new HashMap<>();
+    final RetryUtils.Task<Void> buildProducersTask = new RetryUtils.Task<Void>() {
+      @Override public Void perform() throws Exception {
+        assert producersMap.size() == 0;
+        transactionsMap.forEach((key, value) -> {
+          // Base Producer propeties, missing the transaction Id.
+          baseProducerPros.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, key);
+          HiveKafkaProducer<byte[], byte[]> producer = new HiveKafkaProducer<>(baseProducerPros);
+          producer.resumeTransaction(value.getLeft(), value.getRight());
+          // This is a dummy RPC call to ensure that the producer still resumable and signal the Pre-commit as per :
+          // https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka#EndPhase
+          producer.sendOffsetsToTransaction(ImmutableMap.of(), "__dry_run");
+          producersMap.put(key, producer);
+        });
+        return null;
+      }
+    };
+
+    RetryUtils.CleanupAfterFailure cleanUpTheMap = new RetryUtils.CleanupAfterFailure() {
+      @Override public void cleanup() {
+        producersMap.forEach((s, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+        producersMap.clear();
+      }
+    };
+    final Predicate<Throwable>
+        isRetrayable = (error) -> !KafkaUtils.exceptionIsFatal(error) && !(error instanceof ProducerFencedException);
+    try {
+      RetryUtils.retry(buildProducersTask, isRetrayable, cleanUpTheMap, maxTries, "Error while Builing Producers");
+    } catch (Exception e) {
+      // Can not go further
+      LOG.error("Can not fetch build produces due [{}]", e.getMessage());
+      throw new MetaException(e.getMessage());
+    }
+
+    //Third Stage Commit Transactions, this part is the actual critical section.
+    //The commit might be retried on error, but keep in mind in some cases, like open transaction can expire
+    //after timeout duration of 15 mins it is not possible to go further.
+    final Set<String> committedTx = new HashSet<>();
+    final RetryUtils.Task<Void> commitTask = new RetryUtils.Task() {
+      @Override public Object perform() throws Exception {
+        producersMap.forEach((key, producer) -> {
+          if (!committedTx.contains(key)) {
+            producer.commitTransaction();
+            committedTx.add(key);
+            producer.close();
+            LOG.info("Committed Transaction [{}]", key);
+          }
+        });
+        return null;
+      }
+    };
+
+    try {
+      RetryUtils.retry(commitTask, isRetrayable, maxTries);
+    } catch (Exception e) {
+      // at this point we are in a funky state if one commit happend!! close and log it
+      producersMap.forEach((key, producer) -> producer.close(0, TimeUnit.MILLISECONDS));
+      LOG.error("Commit transaction failed", e);
+      if (committedTx.size() > 0) {
+        LOG.error("Partial Data Got Commited Some actions need to be Done");
+        committedTx.stream().forEach(key -> LOG.error("Transaction [{}] is an orphen commit", key));
+      }
+      throw new MetaException(e.getMessage());
+    }
+
+    //Stage four, clean the Query Directory
+    final RetryUtils.Task<Void> cleanQueryDirTask = new RetryUtils.Task<Void>() {
+      @Override public Void perform() throws Exception {
+        cleanWorkingDirectory(queryWorkingDir);
+        return null;
+      }
+    };
+    try {
+      RetryUtils.retry(cleanQueryDirTask, (error) -> error instanceof IOException, maxTries);
+    } catch (Exception e) {
+      //just log it
+      LOG.error("Faild to clean Query Working Directory [{}] due to [{}]", queryWorkingDir, e.getMessage());
+    }
+  }
+
+  @Override public void preInsertTable(Table table, boolean overwrite) throws MetaException {
+    if (overwrite) {
+      throw new MetaException("Kafa Table does not support the overwite SQL Smentic");
+    }
+  }
+
+  @Override public void rollbackInsertTable(Table table, boolean overwrite) throws MetaException {
+
+  }
+
+  @Override public void preCreateTable(Table table) throws MetaException {
+    if (!table.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) {
+      throw new MetaException(KAFKA_STORAGE_HANDLER + " supports only " + TableType.EXTERNAL_TABLE);
+    }
+    Arrays.stream(KafkaTableProperties.values())
+        .filter(KafkaTableProperties::isMandatory)
+        .forEach(key -> Preconditions.checkNotNull(table.getParameters().get(key.getName()),
+            "Set Table property " + key.getName()));
+    // Put all the default at the pre create.
+    Arrays.stream(KafkaTableProperties.values()).forEach((key) -> {
+      if (table.getParameters().get(key.getName()) == null) {
+        table.putToParameters(key.getName(), key.getDefaultValue());
+      }
+    });
+  }
+
+  @Override public void rollbackCreateTable(Table table) throws MetaException {
+
+  }
+
+  @Override public void commitCreateTable(Table table) throws MetaException {
+    commitInsertTable(table, false);
+  }
+
+  @Override public void preDropTable(Table table) throws MetaException {
+
+  }
+
+  @Override public void rollbackDropTable(Table table) throws MetaException {
+
+  }
+
+  @Override public void commitDropTable(Table table, boolean deleteData) throws MetaException {
+
+  }
+
+  private Path getQueryWorkingDir(Table table) {
+    return new Path(table.getSd().getLocation(), getQueryId());
+  }
+
+  private void cleanWorkingDirectory(Path queryWorkingDir) throws IOException {
+    FileSystem fs = FileSystem.get(getConf());
+    fs.delete(queryWorkingDir, true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
deleted file mode 100644
index 4802c4e..0000000
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStreamingUtils.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.ReflectionUtil;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Constant, Table properties, Utilities class.
- */
-final class KafkaStreamingUtils {
-
-  /**
-   * MANDATORY Table property indicating kafka topic backing the table.
-   */
-  static final String HIVE_KAFKA_TOPIC = "kafka.topic";
-  /**
-   * MANDATORY Table property indicating kafka broker(s) connection string.
-   */
-  static final String HIVE_KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
-  /**
-   * Table property indicating which delegate serde to be used, NOT MANDATORY defaults to {@link KafkaJsonSerDe}
-   */
-  static final String SERDE_CLASS_NAME = "kafka.serde.class";
-  /**
-   * Table property indicating poll/fetch timeout period in millis.
-   * FYI this is independent from internal Kafka consumer timeouts, defaults to {@DEFAULT_CONSUMER_POLL_TIMEOUT_MS}.
-   */
-  static final String HIVE_KAFKA_POLL_TIMEOUT = "hive.kafka.poll.timeout.ms";
-  /**
-   * Default poll timeout for fetching metadata and record batch.
-   */
-  static final long DEFAULT_CONSUMER_POLL_TIMEOUT_MS = 5000L; // 5 seconds
-  /**
-   * Table property prefix used to inject kafka consumer properties, e.g "kafka.consumer.max.poll.records" = "5000"
-   * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT MANDATORY defaults to nothing
-   */
-  static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
-  /**
-   * Set of Kafka properties that the user can not set via DDLs.
-   */
-  static final HashSet<String> FORBIDDEN_PROPERTIES =
-      new HashSet<>(ImmutableList.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
-
-  private KafkaStreamingUtils() {
-  }
-
-  /**
-   * @param configuration Job configs
-   *
-   * @return default consumer properties
-   */
-  static Properties consumerProperties(Configuration configuration) {
-    final Properties props = new Properties();
-    // we are managing the commit offset
-    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-    // we are seeking in the stream so no reset
-    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
-    String brokerEndPoint = configuration.get(HIVE_KAFKA_BOOTSTRAP_SERVERS);
-    if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
-      throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config "
-          + HIVE_KAFKA_BOOTSTRAP_SERVERS);
-    }
-    props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
-    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-    // user can always override stuff
-    final Map<String, String>
-        kafkaProperties =
-        configuration.getValByRegex("^" + CONSUMER_CONFIGURATION_PREFIX + "\\..*");
-    for (Map.Entry<String, String> entry : kafkaProperties.entrySet()) {
-      String key = entry.getKey().substring(CONSUMER_CONFIGURATION_PREFIX.length() + 1);
-      if (FORBIDDEN_PROPERTIES.contains(key)) {
-        throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
-      }
-      props.setProperty(key, entry.getValue());
-    }
-    return props;
-  }
-
-  static void copyDependencyJars(Configuration conf, Class<?>... classes) throws IOException {
-    Set<String> jars = new HashSet<>();
-    FileSystem localFs = FileSystem.getLocal(conf);
-    jars.addAll(conf.getStringCollection("tmpjars"));
-    jars.addAll(Arrays.stream(classes).filter(Objects::nonNull).map(clazz -> {
-      String path = Utilities.jarFinderGetJar(clazz);
-      if (path == null) {
-        throw new RuntimeException("Could not find jar for class " + clazz + " in order to ship it to the cluster.");
-      }
-      try {
-        if (!localFs.exists(new Path(path))) {
-          throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      return path;
-    }).collect(Collectors.toList()));
-
-    if (jars.isEmpty()) {
-      return;
-    }
-    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
-  }
-
-  static AbstractSerDe createDelegate(String className) {
-    final Class<? extends AbstractSerDe> clazz;
-    try {
-      //noinspection unchecked
-      clazz = (Class<? extends AbstractSerDe>) Class.forName(className);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-    // we are not setting conf thus null is okay
-    return ReflectionUtil.newInstance(clazz, null);
-  }
-
-  /**
-   * Basic Enum class for all the metadata columns appended to the Kafka row by the deserializer.
-   */
-  enum MetadataColumn {
-    /**
-     * Record offset column name added as extra metadata column to row as long.
-     */
-    OFFSET("__offset", TypeInfoFactory.longTypeInfo),
-    /**
-     * Record Kafka Partition column name added as extra meta column of type int.
-     */
-    PARTITION("__partition", TypeInfoFactory.intTypeInfo),
-    /**
-     * Record Kafka key column name added as extra meta column of type binary blob.
-     */
-    KEY("__key", TypeInfoFactory.binaryTypeInfo),
-    /**
-     * Record Timestamp column name, added as extra meta column of type long.
-     */
-    TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo),
-    /**
-     * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner.
-     */
-    START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo),
-    /**
-     * End offset given by input split at run time.
-     */
-    END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo);
-
-    private final String name;
-    private final TypeInfo typeInfo;
-
-    MetadataColumn(String name, TypeInfo typeInfo) {
-      this.name = name;
-      this.typeInfo = typeInfo;
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public AbstractPrimitiveWritableObjectInspector getObjectInspector() {
-      return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
-          typeInfo.getTypeName()));
-    }
-  }
-
-  //Order at which column and types will be appended to the original row.
-  /**
-   * Kafka metadata columns order list.
-   */
-  private static final List<MetadataColumn> KAFKA_METADATA_COLUMNS =
-      Arrays.asList(MetadataColumn.KEY,
-          MetadataColumn.PARTITION,
-          MetadataColumn.OFFSET,
-          MetadataColumn.TIMESTAMP,
-          MetadataColumn.START_OFFSET,
-          MetadataColumn.END_OFFSET);
-
-  /**
-   * Kafka metadata column names.
-   */
-  static final List<String> KAFKA_METADATA_COLUMN_NAMES = KAFKA_METADATA_COLUMNS
-      .stream()
-      .map(MetadataColumn::getName)
-      .collect(Collectors.toList());
-
-  /**
-   * Kafka metadata column inspectors.
-   */
-  static final List<ObjectInspector> KAFKA_METADATA_INSPECTORS = KAFKA_METADATA_COLUMNS
-      .stream()
-      .map(MetadataColumn::getObjectInspector)
-      .collect(Collectors.toList());
-
-  /**
-   * Reverse lookup map used to convert records from kafka Writable to hive Writable based on Kafka semantic.
-   */
-  static final Map<String, Function<KafkaRecordWritable, Writable>>
-      recordWritableFnMap = ImmutableMap.<String, Function<KafkaRecordWritable, Writable>>builder()
-      .put(MetadataColumn.END_OFFSET.getName(), (record) -> new LongWritable(record.getEndOffset()))
-      .put(MetadataColumn.KEY.getName(),
-          record -> record.getRecordKey() == null ? null : new BytesWritable(record.getRecordKey()))
-      .put(MetadataColumn.OFFSET.getName(), record -> new LongWritable(record.getOffset()))
-      .put(MetadataColumn.PARTITION.getName(), record -> new IntWritable(record.getPartition()))
-      .put(MetadataColumn.START_OFFSET.getName(), record -> new LongWritable(record.getStartOffset()))
-      .put(MetadataColumn.TIMESTAMP.getName(), record -> new LongWritable(record.getTimestamp()))
-      .build();
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
new file mode 100644
index 0000000..2e1f6fa
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.hive.serde2.JsonSerDe;
+
+/**
+ * Table properties used by Kafka Storage handler.
+ */
+enum KafkaTableProperties {
+  /**
+   * MANDATORY Table property indicating kafka topic backing the table.
+   */
+  HIVE_KAFKA_TOPIC("kafka.topic", null),
+  /**
+   * MANDATORY Table property indicating kafka broker(s) connection string.
+   */
+  HIVE_KAFKA_BOOTSTRAP_SERVERS("kafka.bootstrap.servers", null),
+  /**
+   * Table property indicating which delegate serde to be used.
+   */
+  SERDE_CLASS_NAME("kafka.serde.class", JsonSerDe.class.getName()),
+  /**
+   * Table property indicating poll/fetch timeout period in millis.
+   * FYI this is independent from internal Kafka consumer timeouts.
+   */
+  KAFKA_POLL_TIMEOUT("hive.kafka.poll.timeout.ms", "5000"),
+
+  MAX_RETRIES("hive.kafka.max.retries", "6"), KAFKA_FETCH_METADATA_TIMEOUT("hive.kafka.metadata.poll.timeout.ms",
+      "30000"),
+  /**
+   * Table property indicating the write semantic possible enum values are:
+   * {@link KafkaOutputFormat.WriteSemantic}.
+   */
+  WRITE_SEMANTIC_PROPERTY("kafka.write.semantic", KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE.name()),
+  /**
+   * Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call.
+   */
+  HIVE_KAFKA_OPTIMISTIC_COMMIT("hive.kafka.optimistic.commit", "false");
+
+  /**
+   * Kafka storage handler table properties constructor.
+   * @param name property name.
+   * @param defaultValue default value, set to NULL if the property is mandatory and need to be set by the user.
+   */
+  KafkaTableProperties(String name, String defaultValue) {
+    this.name = name;
+    this.defaultValue = defaultValue;
+    this.mandatory = defaultValue == null;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDefaultValue() {
+    return defaultValue;
+  }
+
+  public boolean isMandatory() {
+    return mandatory;
+  }
+
+  private final String name;
+  private final String defaultValue;
+  private final boolean mandatory;
+}