You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/10/13 17:48:19 UTC

[3/5] hive git commit: HIVE-20639 : Add ability to Write Data from Hive Table/Query to Kafka Topic (Slim Bouguerra via Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
new file mode 100644
index 0000000..6ae9c8d
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.ReflectionUtil;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Utils class for Kafka Storage handler plus some Constants.
+ */
+final class KafkaUtils {
+
+  private KafkaUtils() {
+  }
+
+  /**
+   * Table property prefix used to inject kafka consumer properties, e.g "kafka.consumer.max.poll.records" = "5000"
+   * this will lead to inject max.poll.records=5000 to the Kafka Consumer. NOT MANDATORY defaults to nothing
+   */
+  static final String CONSUMER_CONFIGURATION_PREFIX = "kafka.consumer";
+
+  /**
+   * Table property prefix used to inject kafka producer properties, e.g "kafka.producer.lingers.ms" = "100".
+   */
+  static final String PRODUCER_CONFIGURATION_PREFIX = "kafka.producer";
+
+  /**
+   * Set of Kafka properties that the user can not set via DDLs.
+   */
+  static final Set<String>
+      FORBIDDEN_PROPERTIES =
+      new HashSet<>(ImmutableList.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+          ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+
+  /**
+   * @param configuration Job configs
+   *
+   * @return default consumer properties
+   */
+  static Properties consumerProperties(Configuration configuration) {
+    final Properties props = new Properties();
+    // we are managing the commit offset
+    props.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, Utilities.getTaskId(configuration));
+    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    // we are seeking in the stream so no reset
+    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+    String brokerEndPoint = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
+      throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config "
+          + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    }
+    props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    // user can always override stuff
+    props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
+    return props;
+  }
+
+  private static Map<String, String> extractExtraProperties(final Configuration configuration, String prefix) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    final Map<String, String> kafkaProperties = configuration.getValByRegex("^" + prefix + "\\..*");
+    for (Map.Entry<String, String> entry : kafkaProperties.entrySet()) {
+      String key = entry.getKey().substring(prefix.length() + 1);
+      if (FORBIDDEN_PROPERTIES.contains(key)) {
+        throw new IllegalArgumentException("Not suppose to set Kafka Property " + key);
+      }
+      builder.put(key, entry.getValue());
+    }
+    return builder.build();
+  }
+
+  static Properties producerProperties(Configuration configuration) {
+    final String writeSemanticValue = configuration.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName());
+    final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.valueOf(writeSemanticValue);
+    final Properties properties = new Properties();
+    String brokerEndPoint = configuration.get(KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    if (brokerEndPoint == null || brokerEndPoint.isEmpty()) {
+      throw new IllegalArgumentException("Kafka Broker End Point is missing Please set Config "
+          + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
+    }
+    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+    // user can always override stuff
+    properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
+    String taskId = configuration.get("mapred.task.id", null);
+    properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG,
+        taskId == null ? "random_" + UUID.randomUUID().toString() : taskId);
+    switch (writeSemantic) {
+    case BEST_EFFORT:
+      break;
+    case AT_LEAST_ONCE:
+      properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
+      //The number of acknowledgments the producer requires the leader to have received before considering a request as
+      // complete, all means from all replicas.
+      properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
+      break;
+    case EXACTLY_ONCE:
+      // Assuming that TaskId is ReducerId_attemptId. need the Reducer ID to fence out zombie kafka producers.
+      String reducerId = getTaskId(configuration);
+      //The number of acknowledgments the producer requires the leader to have received before considering a request as
+      // complete, all means from all replicas.
+      properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
+      properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
+      properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, reducerId);
+      //Producer set to be IDEMPOTENT eg ensure that send() retries are idempotent.
+      properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown Semantic " + writeSemantic);
+    }
+    return properties;
+  }
+
+  @SuppressWarnings("SameParameterValue") static void copyDependencyJars(Configuration conf, Class<?>... classes)
+      throws IOException {
+    Set<String> jars = new HashSet<>();
+    FileSystem localFs = FileSystem.getLocal(conf);
+    jars.addAll(conf.getStringCollection("tmpjars"));
+    jars.addAll(Arrays.stream(classes)
+        .filter(Objects::nonNull)
+        .map(clazz -> {
+          String path = Utilities.jarFinderGetJar(clazz);
+          if (path == null) {
+            throw new RuntimeException("Could not find jar for class "
+                + clazz
+                + " in order to ship it to the cluster.");
+          }
+          try {
+            if (!localFs.exists(new Path(path))) {
+              throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz);
+            }
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+          return path;
+        }).collect(Collectors.toList()));
+
+    if (jars.isEmpty()) {
+      return;
+    }
+    conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
+  }
+
+  static AbstractSerDe createDelegate(String className) {
+    final Class<? extends AbstractSerDe> clazz;
+    try {
+      //noinspection unchecked
+      clazz = (Class<? extends AbstractSerDe>) Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    // we are not setting conf thus null is okay
+    return ReflectionUtil.newInstance(clazz, null);
+  }
+
+  static ProducerRecord<byte[], byte[]> toProducerRecord(String topic, KafkaWritable value) {
+    return new ProducerRecord<>(topic,
+        value.getPartition() != -1 ? value.getPartition() : null,
+        value.getTimestamp() != -1L ? value.getTimestamp() : null,
+        value.getRecordKey(),
+        value.getValue());
+  }
+
+  /**
+   * Check if the exception is Non-Retriable there a show stopper all we can do is clean and exit.
+   * @param exception input exception object.
+   * @return true if the exception is fatal thus we only can abort and rethrow the cause.
+   */
+  static boolean exceptionIsFatal(final Throwable exception) {
+    final boolean
+        securityException =
+        exception instanceof AuthenticationException
+            || exception instanceof AuthorizationException
+            || exception instanceof SecurityDisabledException;
+
+    final boolean
+        communicationException =
+        exception instanceof InvalidTopicException
+            || exception instanceof UnknownServerException
+            || exception instanceof SerializationException
+            || exception instanceof OffsetMetadataTooLarge
+            || exception instanceof IllegalStateException;
+
+    return securityException || communicationException;
+  }
+
+  /**
+   * Computes the kafka producer transaction id. The Tx id HAS to be the same across task restarts,
+   * that is why we are excluding the attempt id by removing the last string after last `_`.
+   * Assuming the taskId format is taskId_[m-r]_attemptId.
+   *
+   * @param hiveConf Hive Configuration.
+   * @return the taskId without the attempt id.
+   */
+  static String getTaskId(Configuration hiveConf) {
+    String id = Preconditions.checkNotNull(hiveConf.get("mapred.task.id", null));
+    int index = id.lastIndexOf("_");
+    if (index != -1) {
+      return id.substring(0, index);
+    }
+    return id;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
new file mode 100644
index 0000000..681b666
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Writable implementation of Kafka ConsumerRecord.
+ * Serialized in the form:
+ * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) |
+ * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) |
+ * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])}
+ */
+public class KafkaWritable implements Writable {
+
+  private int partition;
+  private long offset;
+  private long timestamp;
+  private byte[] value;
+  private byte[] recordKey;
+
+  /**
+   * Fist offset given by the input split used to pull the event {@link KafkaInputSplit#getStartOffset()}.
+   */
+  private long startOffset;
+  /**
+   * Last Offset given by the input split used to pull the event {@link KafkaInputSplit#getEndOffset()}.
+   */
+  private long endOffset;
+
+  void set(ConsumerRecord<byte[], byte[]> consumerRecord, long startOffset, long endOffset) {
+    this.partition = consumerRecord.partition();
+    this.timestamp = consumerRecord.timestamp();
+    this.offset = consumerRecord.offset();
+    this.value = consumerRecord.value();
+    this.recordKey = consumerRecord.key();
+    this.startOffset = startOffset;
+    this.endOffset = endOffset;
+  }
+
+  KafkaWritable(int partition,
+      long offset,
+      long timestamp,
+      byte[] value,
+      long startOffset,
+      long endOffset,
+      @Nullable byte[] recordKey) {
+    this.partition = partition;
+    this.offset = offset;
+    this.timestamp = timestamp;
+    this.value = value;
+    this.recordKey = recordKey;
+    this.startOffset = startOffset;
+    this.endOffset = endOffset;
+  }
+
+  KafkaWritable(int partition, long timestamp, byte[] value, @Nullable byte[] recordKey) {
+    this(partition, -1, timestamp, value, -1, -1, recordKey);
+  }
+
+  @SuppressWarnings("WeakerAccess") public KafkaWritable() {
+  }
+
+  @Override public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeLong(timestamp);
+    dataOutput.writeInt(partition);
+    dataOutput.writeLong(offset);
+    dataOutput.writeLong(startOffset);
+    dataOutput.writeLong(endOffset);
+    dataOutput.writeInt(value.length);
+    dataOutput.write(value);
+    if (recordKey != null) {
+      dataOutput.writeInt(recordKey.length);
+      dataOutput.write(recordKey);
+    } else {
+      dataOutput.writeInt(-1);
+    }
+  }
+
+  @Override public void readFields(DataInput dataInput) throws IOException {
+    timestamp = dataInput.readLong();
+    partition = dataInput.readInt();
+    offset = dataInput.readLong();
+    startOffset = dataInput.readLong();
+    endOffset = dataInput.readLong();
+    int dataSize = dataInput.readInt();
+    if (dataSize > 0) {
+      value = new byte[dataSize];
+      dataInput.readFully(value);
+    } else {
+      value = new byte[0];
+    }
+    int keyArraySize = dataInput.readInt();
+    if (keyArraySize > -1) {
+      recordKey = new byte[keyArraySize];
+      dataInput.readFully(recordKey);
+    } else {
+      recordKey = null;
+    }
+  }
+
+  int getPartition() {
+    return partition;
+  }
+
+  @SuppressWarnings("WeakerAccess") long getOffset() {
+    return offset;
+  }
+
+  long getTimestamp() {
+    return timestamp;
+  }
+
+  byte[] getValue() {
+    return value;
+  }
+
+  @SuppressWarnings("WeakerAccess") long getStartOffset() {
+    return startOffset;
+  }
+
+  @SuppressWarnings("WeakerAccess") long getEndOffset() {
+    return endOffset;
+  }
+
+  @Nullable byte[] getRecordKey() {
+    return recordKey;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof KafkaWritable)) {
+      return false;
+    }
+    KafkaWritable writable = (KafkaWritable) o;
+    return partition == writable.partition
+        && offset == writable.offset
+        && startOffset == writable.startOffset
+        && endOffset == writable.endOffset
+        && timestamp == writable.timestamp
+        && Arrays.equals(value, writable.value)
+        && Arrays.equals(recordKey, writable.recordKey);
+  }
+
+  @Override public int hashCode() {
+    int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp);
+    result = 31 * result + Arrays.hashCode(value);
+    result = 31 * result + Arrays.hashCode(recordKey);
+    return result;
+  }
+
+  @Override public String toString() {
+    return "KafkaWritable{"
+        + "partition="
+        + partition
+        + ", offset="
+        + offset
+        + ", startOffset="
+        + startOffset
+        + ", endOffset="
+        + endOffset
+        + ", timestamp="
+        + timestamp
+        + ", value="
+        + Arrays.toString(value)
+        + ", recordKey="
+        + Arrays.toString(recordKey)
+        + '}';
+  }
+
+  Writable getHiveWritable(MetadataColumn metadataColumn) {
+    switch (metadataColumn) {
+    case OFFSET:
+      return new LongWritable(getOffset());
+    case PARTITION:
+      return new IntWritable(getPartition());
+    case TIMESTAMP:
+      return new LongWritable(getTimestamp());
+    case KEY:
+      return getRecordKey() == null ? null : new BytesWritable(getRecordKey());
+    case START_OFFSET:
+      return new LongWritable(getStartOffset());
+    case END_OFFSET:
+      return new LongWritable(getEndOffset());
+    default:
+      throw new IllegalArgumentException("Unknown metadata column [" + metadataColumn.getName() + "]");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
new file mode 100644
index 0000000..60e1aea
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Enum class for all the metadata columns appended to the Kafka row by the Hive Serializer/Deserializer.
+ *
+ * <p>
+ *<b>Design Notes:</b>
+ *
+ * It is important to note that the order at which columns are appended matters, the order is governed by:
+ * {@link MetadataColumn#KAFKA_METADATA_COLUMNS}.
+ *
+ * If you add a new Column make sure to added its Writable converter to {@link KafkaWritable}.
+ *
+ */
+enum MetadataColumn {
+
+  /**
+   * Kafka Record's offset column name added as extra metadata column to row as long.
+   */
+  OFFSET("__offset", TypeInfoFactory.longTypeInfo),
+  /**
+   * Record Kafka Partition column name added as extra meta column of type int.
+   */
+  PARTITION("__partition", TypeInfoFactory.intTypeInfo),
+  /**
+   * Record Kafka key column name added as extra meta column of type binary blob.
+   */
+  KEY("__key", TypeInfoFactory.binaryTypeInfo),
+  /**
+   * Record Timestamp column name, added as extra meta column of type long.
+   */
+  TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo),
+  /**
+   * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner.
+   */
+  // @TODO To be removed next PR it is here to make review easy
+  START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo),
+  /**
+   * End offset given by input split at run time.
+   */
+  // @TODO To be removed next PR it is here to make review easy
+  END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo);
+
+  /**
+   * Kafka metadata columns list that indicates the order of appearance for each column in final row.
+   */
+  private static final List<MetadataColumn>
+      KAFKA_METADATA_COLUMNS =
+      Arrays.asList(KEY, PARTITION, OFFSET, TIMESTAMP, START_OFFSET, END_OFFSET);
+
+  static final List<ObjectInspector>
+      KAFKA_METADATA_INSPECTORS =
+      KAFKA_METADATA_COLUMNS.stream().map(MetadataColumn::getObjectInspector).collect(Collectors.toList());
+
+  static final List<String>
+      KAFKA_METADATA_COLUMN_NAMES =
+      KAFKA_METADATA_COLUMNS.stream().map(MetadataColumn::getName).collect(Collectors.toList());
+
+  private final String name;
+  private final TypeInfo typeInfo;
+
+  MetadataColumn(String name, TypeInfo typeInfo) {
+    this.name = name;
+    this.typeInfo = typeInfo;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public AbstractPrimitiveWritableObjectInspector getObjectInspector() {
+    return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory.getPrimitiveTypeInfo(
+        typeInfo.getTypeName()));
+  }
+
+  private static final Map<String, MetadataColumn>
+      NAMES_MAP =
+      Arrays.stream(MetadataColumn.values()).collect(Collectors.toMap(MetadataColumn::getName, Function.identity()));
+  /**
+   * Column name to MetadataColumn instance.
+   * @param name column name.
+   * @return instance of {@link MetadataColumn} or null if column name is absent
+   */
+  @Nullable
+  static MetadataColumn forName(String name) {
+    return NAMES_MAP.get(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java
new file mode 100644
index 0000000..b2bb208
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/RetryUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Predicate;
+
+/**
+ * Retry utils class mostly taken from Apache Druid Project org.apache.druid.java.util.common.RetryUtils.
+ */
+public final class RetryUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);
+  private static final long MAX_SLEEP_MILLIS = 60000;
+  private static final long BASE_SLEEP_MILLIS = 1000;
+
+  private RetryUtils() {
+  }
+
+  /**
+   * Task to be performed.
+   * @param <T> returned type of the task.
+   */
+  public interface Task<T> {
+    /**
+     * This method is tried up to maxTries times unless it succeeds.
+     */
+    T perform() throws Exception;
+  }
+
+  /**
+   * Cleanup procedure after each failed attempt.
+   */
+  @SuppressWarnings("WeakerAccess") public interface CleanupAfterFailure {
+    /**
+     * This is called once {@link Task#perform()} fails. Retrying is stopped once this method throws an exception,
+     * so errors inside this method should be ignored if you don't want to stop retrying.
+     */
+    void cleanup();
+  }
+
+  /**
+   * Retry an operation using fuzzy exponentially increasing backoff. The wait time after the nth failed attempt is
+   * min(60000ms, 1000ms * pow(2, n - 1)), fuzzed by a number drawn from a Gaussian distribution with mean 0 and
+   * standard deviation 0.2.
+   *
+   * If maxTries is exhausted, or if shouldRetry returns false, the last exception thrown by "f" will be thrown
+   * by this function.
+   *
+   * @param f           the operation
+   * @param shouldRetry predicate determining whether we should retry after a particular exception thrown by "f"
+   * @param quietTries  first quietTries attempts will LOG exceptions at DEBUG level rather than WARN
+   * @param maxTries    maximum number of attempts
+   *
+   * @return result of the first successful operation
+   *
+   * @throws Exception if maxTries is exhausted, or shouldRetry returns false
+   */
+  @SuppressWarnings("WeakerAccess") static <T> T retry(final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry) throws Exception {
+    Preconditions.checkArgument(maxTries > 0, "maxTries > 0");
+    Preconditions.checkArgument(quietTries >= 0, "quietTries >= 0");
+    int nTry = 0;
+    final int maxRetries = maxTries - 1;
+    while (true) {
+      try {
+        nTry++;
+        return f.perform();
+      } catch (Throwable e) {
+        if (cleanupAfterFailure != null) {
+          cleanupAfterFailure.cleanup();
+        }
+        if (nTry < maxTries && shouldRetry.test(e)) {
+          awaitNextRetry(e, messageOnRetry, nTry, maxRetries, nTry <= quietTries);
+        } else {
+          Throwables.propagateIfInstanceOf(e, Exception.class);
+          throw Throwables.propagate(e);
+        }
+      }
+    }
+  }
+
+  static <T> T retry(final Task<T> f, Predicate<Throwable> shouldRetry, final int maxTries) throws Exception {
+    return retry(f, shouldRetry, 0, maxTries);
+  }
+
+  @SuppressWarnings({ "WeakerAccess", "SameParameterValue" }) static <T> T retry(final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries) throws Exception {
+    return retry(f, shouldRetry, quietTries, maxTries, null, null);
+  }
+
+  @SuppressWarnings("unused") public static <T> T retry(final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final CleanupAfterFailure onEachFailure,
+      final int maxTries,
+      final String messageOnRetry) throws Exception {
+    return retry(f, shouldRetry, 0, maxTries, onEachFailure, messageOnRetry);
+  }
+
+  private static void awaitNextRetry(final Throwable e,
+      @Nullable final String messageOnRetry,
+      final int nTry,
+      final int maxRetries,
+      final boolean quiet) throws InterruptedException {
+    final long sleepMillis = nextRetrySleepMillis(nTry);
+    final String fullMessage;
+
+    if (messageOnRetry == null) {
+      fullMessage = String.format("Retrying (%d of %d) in %,dms.", nTry, maxRetries, sleepMillis);
+    } else {
+      fullMessage = String.format("%s, retrying (%d of %d) in %,dms.", messageOnRetry, nTry, maxRetries, sleepMillis);
+    }
+
+    if (quiet) {
+      LOG.debug(fullMessage, e);
+    } else {
+      LOG.warn(fullMessage, e);
+    }
+
+    Thread.sleep(sleepMillis);
+  }
+
+  private static long nextRetrySleepMillis(final int nTry) {
+    final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * ThreadLocalRandom.current().nextGaussian(), 0), 2);
+    return (long) (Math.min(MAX_SLEEP_MILLIS, BASE_SLEEP_MILLIS * Math.pow(2, nTry - 1)) * fuzzyMultiplier);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
new file mode 100644
index 0000000..c95bdb0
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Hive to Kafka Simple Record Writer. It can be used to achieve AT LEAST ONCE semantic, or no guaranties at all.
+ */
+class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<BytesWritable, KafkaWritable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaWriter.class);
+  private static final  String
+      TIMEOUT_CONFIG_HINT =
+      "Try increasing producer property [`retries`] and [`retry.backoff.ms`] to avoid this error [{}].";
+  private static final  String
+      ABORT_MSG =
+      "Writer [%s] aborting Send. Caused by [%s]. Sending to topic [%s]. Record offset [%s];";
+  private static final String
+      ACTION_ABORT =
+      "WriterId [{}] lost record from Topic [{}], delivery Semantic [{}] -> ACTION=ABORT, ERROR caused by [{}]";
+  private static final String
+      ACTION_CARRY_ON =
+      "WriterId [{}], lost record from Topic [{}], delivery Semantic [{}] -> ACTION=CARRY-ON";
+
+  private final String topic;
+  private final String writerId;
+  private final KafkaOutputFormat.WriteSemantic writeSemantic;
+  private final KafkaProducer<byte[], byte[]> producer;
+  private final Callback callback;
+  private final AtomicReference<Exception> sendExceptionRef = new AtomicReference<>();
+  private final AtomicLong lostRecords = new AtomicLong(0L);
+  private long sentRecords = 0L;
+
+  /**
+   * @param topic Kafka Topic.
+   * @param writerId Writer Id use for logging.
+   * @param atLeastOnce true if the desired delivery semantic is at least once.
+   * @param properties Kafka Producer properties.
+   */
+  SimpleKafkaWriter(String topic, @Nullable String writerId, boolean atLeastOnce, Properties properties) {
+    this.writeSemantic =
+        atLeastOnce ? KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE : KafkaOutputFormat.WriteSemantic.BEST_EFFORT;
+    this.writerId = writerId == null ? UUID.randomUUID().toString() : writerId;
+    this.topic = Preconditions.checkNotNull(topic, "Topic can not be null");
+    Preconditions.checkState(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+        "set [" + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + "] property");
+    producer = new KafkaProducer<>(properties, new ByteArraySerializer(), new ByteArraySerializer());
+
+    this.callback = (metadata, exception) -> {
+      if (exception != null) {
+        lostRecords.getAndIncrement();
+        switch (writeSemantic) {
+        case BEST_EFFORT:
+          LOG.warn(ACTION_CARRY_ON, getWriterId(), topic, writeSemantic);
+          break;
+        case AT_LEAST_ONCE:
+          LOG.error(ACTION_ABORT, getWriterId(), topic, writeSemantic, exception.getMessage());
+          sendExceptionRef.compareAndSet(null, exception);
+          break;
+        default:
+              throw new IllegalArgumentException("Unsupported delivery semantic " + writeSemantic);
+        }
+      }
+    };
+    LOG.info("Starting WriterId [{}], Delivery Semantic [{}], Target Kafka Topic [{}]",
+        writerId, writeSemantic,
+        topic);
+  }
+
+  @Override public void write(Writable w) throws IOException {
+    checkExceptions();
+    try {
+      sentRecords++;
+      producer.send(KafkaUtils.toProducerRecord(topic, (KafkaWritable) w), callback);
+    } catch (KafkaException kafkaException) {
+      handleKafkaException(kafkaException);
+      checkExceptions();
+    }
+  }
+
+  private void handleKafkaException(KafkaException kafkaException) {
+    if (kafkaException instanceof TimeoutException) {
+      //This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up.
+      LOG.error(TIMEOUT_CONFIG_HINT, kafkaException.getMessage());
+    }
+    if (KafkaUtils.exceptionIsFatal(kafkaException)) {
+      LOG.error(String.format(ABORT_MSG, writerId, kafkaException.getMessage(), topic, -1L));
+      sendExceptionRef.compareAndSet(null, kafkaException);
+    } else {
+      if (writeSemantic == KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE) {
+        LOG.error(ACTION_ABORT, writerId, topic, writeSemantic, kafkaException.getMessage());
+        sendExceptionRef.compareAndSet(null, kafkaException);
+      } else {
+        LOG.warn(ACTION_CARRY_ON, writerId, topic, writeSemantic);
+      }
+    }
+  }
+
+  @Override public void close(boolean abort) throws IOException {
+    if (abort) {
+      LOG.info("Aborting is set to TRUE, Closing writerId [{}] without flush.", writerId);
+      producer.close(0, TimeUnit.MICROSECONDS);
+      return;
+    } else {
+      LOG.info("Flushing Kafka Producer with writerId [{}]", writerId);
+      producer.flush();
+      LOG.info("Closing WriterId [{}]", writerId);
+      producer.close();
+    }
+    LOG.info("Closed WriterId [{}] Delivery semantic [{}], Topic[{}], Total sent Records [{}], Total Lost Records [{}]",
+        writerId, writeSemantic,
+        topic,
+        sentRecords,
+        lostRecords.get());
+    checkExceptions();
+  }
+
+  @VisibleForTesting String getWriterId() {
+    return writerId;
+  }
+
+  @VisibleForTesting long getLostRecords() {
+    return lostRecords.get();
+  }
+
+  @VisibleForTesting long getSentRecords() {
+    return sentRecords;
+  }
+
+  @Override public void write(BytesWritable bytesWritable, KafkaWritable kafkaWritable) throws IOException {
+    this.write(kafkaWritable);
+  }
+
+  @Override public void close(Reporter reporter) throws IOException {
+    this.close(false);
+  }
+
+  private void checkExceptions() throws IOException {
+    if (sendExceptionRef.get() != null) {
+      LOG.error("Send Exception Aborting write from writerId [{}]", writerId);
+      producer.close(0, TimeUnit.MICROSECONDS);
+      throw new IOException(sendExceptionRef.get());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
new file mode 100644
index 0000000..fb4d034
--- /dev/null
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/TransactionalKafkaWriter.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Transactional Kafka Record Writer used to achieve Exactly once semantic.
+ */
+class TransactionalKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<BytesWritable, KafkaWritable> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionalKafkaWriter.class);
+  private static final String TRANSACTION_DIR = "transaction_states";
+
+  private final String topic;
+  private final HiveKafkaProducer<byte[], byte[]> producer;
+  private final Callback callback;
+  private final AtomicReference<Exception> sendExceptionRef = new AtomicReference<>();
+  private final Path openTxFileName;
+  private final boolean optimisticCommit;
+  private final FileSystem fileSystem;
+  private final Map<TopicPartition, Long> offsets = new HashMap<>();
+  private final String writerIdTopicId;
+  private final long producerId;
+  private final short producerEpoch;
+  private long sentRecords = 0L;
+
+  /**
+   *  @param topic Kafka topic.
+   * @param producerProperties kafka producer properties.
+   * @param queryWorkingPath the Query working directory as, table_directory/hive_query_id.
+ *                         Used to store the state of the transaction and/or log sent records and partitions.
+ *                         for more information see:
+ *                         {@link KafkaStorageHandler#getQueryWorkingDir(org.apache.hadoop.hive.metastore.api.Table)}
+   * @param fileSystem file system handler.
+   * @param optimisticCommit if true the commit will happen at the task level otherwise will be delegated to HS2.
+   */
+  TransactionalKafkaWriter(String topic, Properties producerProperties,
+      Path queryWorkingPath,
+      FileSystem fileSystem,
+      @Nullable Boolean optimisticCommit) {
+    this.fileSystem = fileSystem;
+    this.topic = Preconditions.checkNotNull(topic, "NULL topic !!");
+
+    Preconditions.checkState(producerProperties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+        "set [" + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + "] property");
+    producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    this.producer = new HiveKafkaProducer<>(producerProperties);
+    this.optimisticCommit = optimisticCommit == null ? true : optimisticCommit;
+    this.callback = (metadata, exception) -> {
+      if (exception != null) {
+        sendExceptionRef.compareAndSet(null, exception);
+      } else {
+        //According to https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
+        //Callbacks form the same TopicPartition will return in order thus this will keep track of most recent offset.
+        final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+        offsets.put(tp, metadata.offset());
+      }
+    };
+    // Start Tx
+    assert producer.getTransactionalId() != null;
+    try {
+      producer.initTransactions();
+      producer.beginTransaction();
+    } catch (Exception exception) {
+      logHints(exception);
+      if (tryToAbortTx(exception)) {
+        LOG.error("Aborting Transaction [{}] cause by ERROR [{}]",
+            producer.getTransactionalId(),
+            exception.getMessage());
+        producer.abortTransaction();
+      }
+      LOG.error("Closing writer [{}] caused by ERROR [{}]", producer.getTransactionalId(), exception.getMessage());
+      producer.close(0, TimeUnit.MILLISECONDS);
+      throw exception;
+    }
+    writerIdTopicId = String.format("WriterId [%s], Kafka Topic [%s]", producer.getTransactionalId(), topic);
+    producerEpoch = this.optimisticCommit ? -1 : producer.getEpoch();
+    producerId = this.optimisticCommit ? -1 : producer.getProducerId();
+    LOG.info("DONE with Initialization of {}, Epoch[{}], internal ID[{}]", writerIdTopicId, producerEpoch, producerId);
+    //Writer base working directory
+    openTxFileName =
+        this.optimisticCommit ?
+            null :
+            new Path(new Path(new Path(queryWorkingPath, TRANSACTION_DIR), producer.getTransactionalId()),
+                String.valueOf(producerEpoch));
+  }
+
+  @Override public void write(Writable w) throws IOException {
+    checkExceptions();
+    try {
+      sentRecords++;
+      producer.send(KafkaUtils.toProducerRecord(topic, (KafkaWritable) w), callback);
+    } catch (Exception e) {
+      if (tryToAbortTx(e)) {
+        // producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
+        producer.abortTransaction();
+      }
+      producer.close(0, TimeUnit.MILLISECONDS);
+      sendExceptionRef.compareAndSet(null, e);
+      checkExceptions();
+    }
+  }
+
+  private void logHints(Exception e) {
+    if (e instanceof TimeoutException) {
+      LOG.error("Maybe Try to increase [`retry.backoff.ms`] to avoid this error [{}].", e.getMessage());
+    }
+  }
+
+  /**
+   * The non Abort Close method can be split into 2 parts.
+   * Part one is to Flush to Kafka all the buffered Records then Log (Topic-Partition, Offset).
+   * Part two is To either commit the TX or Save the state of the TX to WAL and let HS2 do the commit.
+   *
+   * @param abort if set to true will abort flush and exit
+   * @throws IOException exception causing the failure
+   */
+  @Override public void close(boolean abort) throws IOException {
+    if (abort) {
+      // Case Abort, try to AbortTransaction -> Close producer ASAP -> Exit;
+      LOG.warn("Aborting Transaction and Sending from {}", writerIdTopicId);
+      try {
+        producer.abortTransaction();
+      } catch (Exception e) {
+        LOG.error("Aborting Transaction {} failed due to [{}]", writerIdTopicId, e.getMessage());
+      }
+      producer.close(0, TimeUnit.MILLISECONDS);
+      return;
+    }
+
+    // Normal Case ->  lOG and Commit then Close
+    LOG.info("Flushing Kafka buffer of writerId {}", writerIdTopicId);
+    producer.flush();
+
+    // No exception good let's log to a file whatever Flushed.
+    String formattedMsg = "Topic[%s] Partition [%s] -> Last offset [%s]";
+    String
+        flushedOffsetMsg =
+        offsets.entrySet()
+            .stream()
+            .map(topicPartitionLongEntry -> String.format(formattedMsg,
+                topicPartitionLongEntry.getKey().topic(),
+                topicPartitionLongEntry.getKey().partition(),
+                topicPartitionLongEntry.getValue()))
+            .collect(Collectors.joining(","));
+
+    LOG.info("WriterId {} flushed the following [{}] ", writerIdTopicId, flushedOffsetMsg);
+    // OPTIMISTIC COMMIT OR PERSIST STATE OF THE TX_WAL
+    checkExceptions();
+    if (optimisticCommit) {
+      // Case Commit at the task level
+      commitTransaction();
+    } else {
+      // Case delegate TX commit to HS2
+      persistTxState();
+    }
+    checkExceptions();
+    producer.close();
+    LOG.info("Closed writerId [{}], Sent [{}] records to Topic [{}]",
+        producer.getTransactionalId(),
+        sentRecords,
+        topic);
+  }
+
+  private void commitTransaction() {
+    LOG.info("Attempting Optimistic commit by {}", writerIdTopicId);
+    try {
+      producer.commitTransaction();
+    } catch (Exception e) {
+      sendExceptionRef.compareAndSet(null, e);
+    }
+  }
+
+  /**
+   * Write the Kafka Consumer PID and Epoch to checkpoint file {@link TransactionalKafkaWriter#openTxFileName}.
+   */
+  private void persistTxState() {
+    LOG.info("Committing state to path [{}] by [{}]", openTxFileName.toString(), writerIdTopicId);
+    try (FSDataOutputStream outStream = fileSystem.create(openTxFileName)) {
+      outStream.writeLong(producerId);
+      outStream.writeShort(producerEpoch);
+    } catch (Exception e) {
+      sendExceptionRef.compareAndSet(null, e);
+    }
+  }
+
+  @Override public void write(BytesWritable bytesWritable, KafkaWritable kafkaWritable) throws IOException {
+    write(kafkaWritable);
+  }
+
+  @Override public void close(Reporter reporter) throws IOException {
+    close(false);
+  }
+
+  @VisibleForTesting long getSentRecords() {
+    return sentRecords;
+  }
+
+  @VisibleForTesting short getProducerEpoch() {
+    return producerEpoch;
+  }
+
+  @VisibleForTesting long getProducerId() {
+    return producerId;
+  }
+
+  /**
+   * Checks for existing exception. In case of exception will close consumer and rethrow as IOException
+   * @throws IOException abort if possible, close consumer then rethrow exception.
+   */
+  private void checkExceptions() throws IOException {
+    if (sendExceptionRef.get() != null && sendExceptionRef.get() instanceof KafkaException && sendExceptionRef.get()
+        .getCause() instanceof ProducerFencedException) {
+      // producer.send() may throw a KafkaException which wraps a FencedException re throw its wrapped inner cause.
+      sendExceptionRef.updateAndGet(e -> (KafkaException) e.getCause());
+    }
+    if (sendExceptionRef.get() != null) {
+      final Exception exception = sendExceptionRef.get();
+      logHints(exception);
+      if (tryToAbortTx(exception)) {
+        LOG.error("Aborting Transaction [{}] cause by ERROR [{}]", writerIdTopicId, exception.getMessage());
+        producer.abortTransaction();
+      }
+      LOG.error("Closing writer [{}] caused by ERROR [{}]", writerIdTopicId, exception.getMessage());
+      producer.close(0, TimeUnit.MILLISECONDS);
+      throw new IOException(exception);
+    }
+  }
+
+  private boolean tryToAbortTx(Throwable e) {
+    // According to https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
+    // We can't recover from these exceptions, so our only option is to close the producer and exit.
+    boolean
+        isNotFencedOut =
+        !(e instanceof ProducerFencedException)
+            && !(e instanceof OutOfOrderSequenceException)
+            && !(e instanceof AuthenticationException);
+    // producer.send() may throw a KafkaException which wraps a FencedException therefore check inner cause.
+    boolean causeIsNotFencedOut = !(e.getCause() != null && e.getCause() instanceof ProducerFencedException);
+    return isNotFencedOut && causeIsNotFencedOut;
+  }
+
+  /**
+   * Given a query workingDirectory as table_directory/hive_query_id/ will fetch the open transaction states.
+   * Table directory is {@link org.apache.hadoop.hive.metastore.api.Table#getSd()#getLocation()}.
+   * Hive Query ID is inferred from the JobConf see {@link KafkaStorageHandler#getQueryId()}.
+   *
+   * The path to a transaction state is as follow.
+   * .../{@code queryWorkingDir}/{@code TRANSACTION_DIR}/{@code writerId}/{@code producerEpoch}
+   *
+   * The actual state is stored in the file {@code producerEpoch}.
+   * The file contains a {@link Long} as internal producer Id and a {@link Short} as the producer epoch.
+   * According to Kafka API, highest epoch corresponds to the active Producer, therefore if there is multiple
+   * {@code producerEpoch} files will pick the maximum based on {@link Short::compareTo}.
+   *
+   * @param fs File system handler.
+   * @param queryWorkingDir Query working Directory, see:
+   *                        {@link KafkaStorageHandler#getQueryWorkingDir(org.apache.hadoop.hive.metastore.api.Table)}.
+   * @return Map of Transaction Ids to Pair of Kafka Producer internal ID (Long) and producer epoch (short)
+   * @throws IOException if any of the IO operations fail.
+   */
+  static Map<String, Pair<Long, Short>> getTransactionsState(FileSystem fs, Path queryWorkingDir) throws IOException {
+    //list all current Dir
+    final Path transactionWorkingDir = new Path(queryWorkingDir, TRANSACTION_DIR);
+    final FileStatus[] files = fs.listStatus(transactionWorkingDir);
+    final Set<FileStatus>
+        transactionSet =
+        Arrays.stream(files).filter(FileStatus::isDirectory).collect(Collectors.toSet());
+    Set<Path> setOfTxPath = transactionSet.stream().map(FileStatus::getPath).collect(Collectors.toSet());
+    ImmutableMap.Builder<String, Pair<Long, Short>> builder = ImmutableMap.builder();
+    setOfTxPath.forEach(path -> {
+      final String txId = path.getName();
+      try {
+        FileStatus[] epochFiles = fs.listStatus(path);
+        // List all the Epoch if any and select the max.
+        // According to Kafka API recent venison of Producer with the same TxID will have greater epoch and same PID.
+        Optional<Short>
+            maxEpoch =
+            Arrays.stream(epochFiles)
+                .filter(FileStatus::isFile)
+                .map(fileStatus -> Short.valueOf(fileStatus.getPath().getName()))
+                .max(Short::compareTo);
+        short
+            epoch =
+            maxEpoch.orElseThrow(() -> new RuntimeException("Missing sub directory epoch from directory ["
+                + path.toString()
+                + "]"));
+        Path openTxFileName = new Path(path, String.valueOf(epoch));
+        long internalId;
+        try (FSDataInputStream inStream = fs.open(openTxFileName)) {
+          internalId = inStream.readLong();
+          short fileEpoch = inStream.readShort();
+          if (epoch != fileEpoch) {
+            throw new RuntimeException(String.format("Was expecting [%s] but got [%s] from path [%s]",
+                epoch,
+                fileEpoch,
+                path.toString()));
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        builder.put(txId, Pair.of(internalId, epoch));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
new file mode 100644
index 0000000..db2515c
--- /dev/null
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Test class for Hive Kafka Producer.
+ */
+@SuppressWarnings("unchecked") public class HiveKafkaProducerTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducerTest.class);
+  private static final int RECORD_NUMBER = 17384;
+  private static final byte[] KEY_BYTES = "KEY".getBytes(Charset.forName("UTF-8"));
+  private static final KafkaBrokerResource KAFKA_BROKER_RESOURCE = new KafkaBrokerResource();
+
+  private static final String TOPIC = "test-tx-producer";
+  private static final List<ProducerRecord<byte[], byte[]>>
+      RECORDS =
+      IntStream.range(0, RECORD_NUMBER).mapToObj(number -> {
+        final byte[] value = ("VALUE-" + Integer.toString(number)).getBytes(Charset.forName("UTF-8"));
+        return new ProducerRecord<>(TOPIC, value, KEY_BYTES);
+      }).collect(Collectors.toList());
+
+  @BeforeClass public static void setupCluster() throws Throwable {
+    KAFKA_BROKER_RESOURCE.before();
+  }
+
+  @AfterClass public static void tearDownCluster() {
+    KAFKA_BROKER_RESOURCE.after();
+  }
+
+  private KafkaConsumer<byte[], byte[]> consumer;
+  private Properties producerProperties;
+  private HiveKafkaProducer producer;
+
+  @Before public void setUp() {
+    LOG.info("setting up Config");
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty("enable.auto.commit", "false");
+    consumerProps.setProperty("auto.offset.reset", "none");
+    consumerProps.setProperty("bootstrap.servers", KafkaBrokerResource.BROKER_IP_PORT);
+    consumerProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
+    consumerProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
+    consumerProps.setProperty("request.timeout.ms", "3002");
+    consumerProps.setProperty("fetch.max.wait.ms", "3001");
+    consumerProps.setProperty("session.timeout.ms", "3001");
+    consumerProps.setProperty("metadata.max.age.ms", "100");
+    consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+    this.consumer = new KafkaConsumer<>(consumerProps);
+
+    String txId = UUID.randomUUID().toString();
+    producerProperties = new Properties();
+    producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaBrokerResource.BROKER_IP_PORT);
+    producerProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, txId);
+    producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    producer = new HiveKafkaProducer(producerProperties);
+  }
+
+  @After public void tearDown() {
+    LOG.info("tearDown");
+    consumer.close();
+    consumer = null;
+  }
+
+  @Test public void resumeTransaction() {
+    producer.initTransactions();
+    producer.beginTransaction();
+    long pid = producer.getProducerId();
+    short epoch = producer.getEpoch();
+    Assert.assertTrue(pid > -1);
+    Assert.assertTrue(epoch > -1);
+    //noinspection unchecked
+    RECORDS.forEach(producer::send);
+    producer.flush();
+    producer.close();
+
+    HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+    secondProducer.resumeTransaction(pid, epoch);
+    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+    secondProducer.commitTransaction();
+    secondProducer.close();
+
+    Collection<TopicPartition> assignment = Collections.singletonList(new TopicPartition(TOPIC, 0));
+    consumer.assign(assignment);
+    consumer.seekToBeginning(assignment);
+    long numRecords = 0;
+    @SuppressWarnings("unchecked") final List<ConsumerRecord<byte[], byte[]>> actualRecords = new ArrayList();
+    while (numRecords < RECORD_NUMBER) {
+      ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(1000));
+      actualRecords.addAll(consumerRecords.records(new TopicPartition(TOPIC, 0)));
+      numRecords += consumerRecords.count();
+    }
+    Assert.assertEquals("Size matters !!", RECORDS.size(), actualRecords.size());
+    Iterator<ProducerRecord<byte[], byte[]>> expectedIt = RECORDS.iterator();
+    Iterator<ConsumerRecord<byte[], byte[]>> actualIt = actualRecords.iterator();
+    while (expectedIt.hasNext()) {
+      ProducerRecord<byte[], byte[]> expected = expectedIt.next();
+      ConsumerRecord<byte[], byte[]> actual = actualIt.next();
+      Assert.assertArrayEquals("value not matching", expected.value(), actual.value());
+      Assert.assertArrayEquals("key not matching", expected.key(), actual.key());
+    }
+  }
+
+  @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() {
+    HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+    secondProducer.resumeTransaction(3434L, (short) 12);
+    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+    secondProducer.close();
+  }
+
+  @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpoch() {
+    producer.initTransactions();
+    producer.beginTransaction();
+    long pid = producer.getProducerId();
+    producer.close();
+    HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+    secondProducer.resumeTransaction(pid, (short) 12);
+    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+    secondProducer.close();
+  }
+
+  @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongPID() {
+    producer.initTransactions();
+    producer.beginTransaction();
+    short epoch = producer.getEpoch();
+    producer.close();
+    HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties);
+    secondProducer.resumeTransaction(45L, epoch);
+    secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group");
+    secondProducer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
new file mode 100644
index 0000000..fbcbe9a
--- /dev/null
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.TestUtils;
+import kafka.zk.AdminZkClient;
+import kafka.zk.EmbeddedZookeeper;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.utils.Time;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Properties;
+
+/**
+ * Test Helper Class to start and stop a kafka broker.
+ */
+class KafkaBrokerResource extends ExternalResource {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaBrokerResource.class);
+  private static final String TOPIC = "TEST-CREATE_TOPIC";
+  static final String BROKER_IP_PORT = "127.0.0.1:9092";
+  private EmbeddedZookeeper zkServer;
+  private KafkaServer kafkaServer;
+  private AdminZkClient adminZkClient;
+  private Path tmpLogDir;
+
+  /**
+   * Override to set up your specific external resource.
+   *
+   * @throws Throwable if setup fails (which will disable {@code after}
+   */
+  @Override protected void before() throws Throwable {
+    // Start the ZK and the Broker
+    LOG.info("init embedded Zookeeper");
+    zkServer = new EmbeddedZookeeper();
+    tmpLogDir = Files.createTempDirectory("kafka-log-dir-").toAbsolutePath();
+    String zkConnect = "127.0.0.1:" + zkServer.port();
+    LOG.info("init kafka broker");
+    Properties brokerProps = new Properties();
+    brokerProps.setProperty("zookeeper.connect", zkConnect);
+    brokerProps.setProperty("broker.id", "0");
+    brokerProps.setProperty("log.dir", tmpLogDir.toString());
+    brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKER_IP_PORT);
+    brokerProps.setProperty("offsets.topic.replication.factor", "1");
+    brokerProps.setProperty("transaction.state.log.replication.factor", "1");
+    brokerProps.setProperty("transaction.state.log.min.isr", "1");
+    KafkaConfig config = new KafkaConfig(brokerProps);
+    kafkaServer = TestUtils.createServer(config, Time.SYSTEM);
+    kafkaServer.startup();
+    kafkaServer.zkClient();
+    adminZkClient = new AdminZkClient(kafkaServer.zkClient());
+    LOG.info("Creating kafka TOPIC [{}]", TOPIC);
+    adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+  }
+
+  /**
+   * Override to tear down your specific external resource.
+   */
+  @Override protected void after() {
+    super.after();
+    try {
+      FileUtils.deleteDirectory(new File(tmpLogDir.toString()));
+    } catch (IOException e) {
+      LOG.error("Error cleaning " + tmpLogDir.toString(), e);
+    }
+    if (kafkaServer != null) {
+      kafkaServer.shutdown();
+      kafkaServer.awaitShutdown();
+    }
+    zkServer.shutdown();
+  }
+
+  void deleteTopic(@SuppressWarnings("SameParameterValue") String topic) {
+    adminZkClient.deleteTopic(topic);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java
new file mode 100644
index 0000000..6e95a54
--- /dev/null
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaInputSplitTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.kafka;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Kafka Hadoop InputSplit Test.
+ */
+public class KafkaInputSplitTest {
+  private final KafkaInputSplit expectedInputSplit;
+
+  public KafkaInputSplitTest() {
+    String topic = "my_topic";
+    this.expectedInputSplit = new KafkaInputSplit(topic, 1, 50L, 56L, new Path("/tmp"));
+  }
+
+  @Test public void testWriteRead() throws IOException {
+    DataOutputBuffer output = new DataOutputBuffer();
+    this.expectedInputSplit.write(output);
+    KafkaInputSplit kafkaInputSplit = new KafkaInputSplit();
+    DataInputBuffer input = new DataInputBuffer();
+    input.reset(output.getData(), 0, output.getLength());
+    kafkaInputSplit.readFields(input);
+    Assert.assertEquals(this.expectedInputSplit, kafkaInputSplit);
+  }
+
+  @Test public void andRangeOverLapping() {
+    KafkaInputSplit kafkaInputSplit = new KafkaInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
+
+    KafkaInputSplit kafkaInputSplit2 = new KafkaInputSplit("test-topic", 2, 3, 200, new Path("/tmp"));
+
+    Assert.assertEquals(new KafkaInputSplit("test-topic", 2, 10, 200, new Path("/tmp")),
+        KafkaInputSplit.intersectRange(kafkaInputSplit, kafkaInputSplit2));
+
+  }
+
+  @Test public void andRangeNonOverLapping() {
+    KafkaInputSplit kafkaInputSplit = new KafkaInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
+
+    KafkaInputSplit kafkaInputSplit2 =
+        new KafkaInputSplit("test-topic", 2, 550, 700, new Path("/tmp"));
+
+    Assert.assertNull(KafkaInputSplit.intersectRange(kafkaInputSplit, kafkaInputSplit2));
+
+  }
+
+  @Test public void orRange() {
+    KafkaInputSplit kafkaInputSplit =
+        new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+    KafkaInputSplit kafkaInputSplit2 = new KafkaInputSplit("test-topic", 2, 3, 600, new Path("/tmp"));
+
+    Assert.assertEquals(kafkaInputSplit2,
+        KafkaInputSplit.unionRange(kafkaInputSplit, kafkaInputSplit2));
+
+    KafkaInputSplit kafkaInputSplit3 =
+        new KafkaInputSplit("test-topic", 2, 700, 6000, new Path("/tmp"));
+
+    Assert.assertEquals(new KafkaInputSplit("test-topic", 2, 300, 6000, new Path("/tmp")),
+        KafkaInputSplit.unionRange(kafkaInputSplit, kafkaInputSplit3));
+  }
+
+  @Test public void copyOf() {
+    KafkaInputSplit kafkaInputSplit =
+        new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+    KafkaInputSplit copyOf = KafkaInputSplit.copyOf(kafkaInputSplit);
+    Assert.assertEquals(kafkaInputSplit, copyOf);
+    Assert.assertNotSame(kafkaInputSplit, copyOf);
+  }
+
+  @Test public void testClone() {
+    KafkaInputSplit kafkaInputSplit =
+        new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+
+    KafkaInputSplit clone = KafkaInputSplit.copyOf(kafkaInputSplit);
+    Assert.assertEquals(kafkaInputSplit, clone);
+    Assert.assertNotSame(clone, kafkaInputSplit);
+
+  }
+
+  @Test public void testSlice() {
+    KafkaInputSplit kafkaInputSplit =
+        new KafkaInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
+    List<KafkaInputSplit> kafkaInputSplitList = KafkaInputSplit.slice(14, kafkaInputSplit);
+    Assert.assertEquals(kafkaInputSplitList.stream()
+        .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset()
+            - kafkaPullerInputSplit1.getStartOffset())
+        .sum(), kafkaInputSplit.getEndOffset() - kafkaInputSplit.getStartOffset());
+    Assert.assertEquals(1,
+        kafkaInputSplitList.stream()
+            .filter(kafkaPullerInputSplit1 -> kafkaInputSplit.getStartOffset()
+                == kafkaPullerInputSplit1.getStartOffset())
+            .count());
+    Assert.assertEquals(1,
+        kafkaInputSplitList.stream()
+            .filter(kafkaPullerInputSplit1 -> kafkaInputSplit.getEndOffset()
+                == kafkaPullerInputSplit1.getEndOffset())
+            .count());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5ace1f78/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
deleted file mode 100644
index 00f95ca..0000000
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaPullerInputSplitTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.kafka;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Kafka Hadoop InputSplit Test.
- */
-public class KafkaPullerInputSplitTest {
-  private KafkaPullerInputSplit expectedInputSplit;
-
-  public KafkaPullerInputSplitTest() {
-    String topic = "my_topic";
-    this.expectedInputSplit = new KafkaPullerInputSplit(topic, 1, 50L, 56L, new Path("/tmp"));
-  }
-
-  @Test public void testWriteRead() throws IOException {
-    DataOutput output = new DataOutputBuffer();
-    this.expectedInputSplit.write(output);
-    KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit();
-    DataInput input = new DataInputBuffer();
-    ((DataInputBuffer) input).reset(((DataOutputBuffer) output).getData(), 0, ((DataOutputBuffer) output).getLength());
-    kafkaPullerInputSplit.readFields(input);
-    Assert.assertEquals(this.expectedInputSplit, kafkaPullerInputSplit);
-  }
-
-  @Test public void andRangeOverLapping() {
-    KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
-
-    KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit("test-topic", 2, 3, 200, new Path("/tmp"));
-
-    Assert.assertEquals(new KafkaPullerInputSplit("test-topic", 2, 10, 200, new Path("/tmp")),
-        KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
-
-  }
-
-  @Test public void andRangeNonOverLapping() {
-    KafkaPullerInputSplit kafkaPullerInputSplit = new KafkaPullerInputSplit("test-topic", 2, 10, 400, new Path("/tmp"));
-
-    KafkaPullerInputSplit
-        kafkaPullerInputSplit2 =
-        new KafkaPullerInputSplit("test-topic", 2, 550, 700, new Path("/tmp"));
-
-    Assert.assertEquals(null, KafkaPullerInputSplit.intersectRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
-
-  }
-
-  @Test public void orRange() {
-    KafkaPullerInputSplit
-        kafkaPullerInputSplit =
-        new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-
-    KafkaPullerInputSplit kafkaPullerInputSplit2 = new KafkaPullerInputSplit("test-topic", 2, 3, 600, new Path("/tmp"));
-
-    Assert.assertEquals(kafkaPullerInputSplit2,
-        KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit2));
-
-    KafkaPullerInputSplit
-        kafkaPullerInputSplit3 =
-        new KafkaPullerInputSplit("test-topic", 2, 700, 6000, new Path("/tmp"));
-
-    Assert.assertEquals(new KafkaPullerInputSplit("test-topic", 2, 300, 6000, new Path("/tmp")),
-        KafkaPullerInputSplit.unionRange(kafkaPullerInputSplit, kafkaPullerInputSplit3));
-  }
-
-  @Test public void copyOf() {
-    KafkaPullerInputSplit
-        kafkaPullerInputSplit =
-        new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-
-    KafkaPullerInputSplit copyOf = KafkaPullerInputSplit.copyOf(kafkaPullerInputSplit);
-    Assert.assertEquals(kafkaPullerInputSplit, copyOf);
-    Assert.assertTrue(kafkaPullerInputSplit != copyOf);
-  }
-
-  @Test public void testClone() {
-    KafkaPullerInputSplit
-        kafkaPullerInputSplit =
-        new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-
-    KafkaPullerInputSplit clone = kafkaPullerInputSplit.clone();
-    Assert.assertEquals(kafkaPullerInputSplit, clone);
-    Assert.assertTrue(clone != kafkaPullerInputSplit);
-
-  }
-
-  @Test public void testSlice() {
-    KafkaPullerInputSplit
-        kafkaPullerInputSplit =
-        new KafkaPullerInputSplit("test-topic", 2, 300, 400, new Path("/tmp"));
-    List<KafkaPullerInputSplit> kafkaPullerInputSplitList = KafkaPullerInputSplit.slice(14, kafkaPullerInputSplit);
-    Assert.assertEquals(kafkaPullerInputSplitList.stream()
-        .mapToLong(kafkaPullerInputSplit1 -> kafkaPullerInputSplit1.getEndOffset()
-            - kafkaPullerInputSplit1.getStartOffset())
-        .sum(), kafkaPullerInputSplit.getEndOffset() - kafkaPullerInputSplit.getStartOffset());
-    Assert.assertTrue(kafkaPullerInputSplitList.stream()
-        .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getStartOffset()
-            == kafkaPullerInputSplit1.getStartOffset())
-        .count() == 1);
-    Assert.assertTrue(kafkaPullerInputSplitList.stream()
-        .filter(kafkaPullerInputSplit1 -> kafkaPullerInputSplit.getEndOffset() == kafkaPullerInputSplit1.getEndOffset())
-        .count() == 1);
-
-  }
-}