You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by bs...@apache.org on 2018/10/18 17:12:53 UTC

hive git commit: HIVE-20735 Adding Support for Kerberos Auth, Removed start/end offset columns, remove the best effort mode and made 2pc default for EOS (Slim Bouguerra, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master d67d52c8d -> dcaeeb472


HIVE-20735 Adding Support for Kerberos Auth, Removed start/end offset columns, remove the best effort mode and made 2pc default for EOS (Slim Bouguerra, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dcaeeb47
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dcaeeb47
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dcaeeb47

Branch: refs/heads/master
Commit: dcaeeb4722553f3256c9d3f5282cc82cae7bc9b4
Parents: d67d52c
Author: Slim Bouguerra <bs...@apache.org>
Authored: Thu Oct 18 10:11:26 2018 -0700
Committer: Slim Bouguerra <bs...@apache.org>
Committed: Thu Oct 18 10:11:26 2018 -0700

----------------------------------------------------------------------
 kafka-handler/README.md                         |   6 +-
 .../hadoop/hive/kafka/KafkaOutputFormat.java    |  12 +-
 .../hadoop/hive/kafka/KafkaRecordReader.java    |  12 +-
 .../apache/hadoop/hive/kafka/KafkaSerDe.java    |   4 +-
 .../hadoop/hive/kafka/KafkaStorageHandler.java  |   7 +
 .../hadoop/hive/kafka/KafkaTableProperties.java |   1 +
 .../apache/hadoop/hive/kafka/KafkaUtils.java    |  54 +++-
 .../apache/hadoop/hive/kafka/KafkaWritable.java |  52 +---
 .../hadoop/hive/kafka/MetadataColumn.java       |  14 +-
 .../hadoop/hive/kafka/SimpleKafkaWriter.java    |  31 +--
 .../hive/kafka/KafkaRecordIteratorTest.java     |   5 +-
 .../hadoop/hive/kafka/KafkaUtilsTest.java       |  10 +-
 .../hadoop/hive/kafka/KafkaWritableTest.java    |  15 +-
 .../hive/kafka/SimpleKafkaWriterTest.java       |  22 +-
 .../clientpositive/kafka_storage_handler.q      |  44 ++--
 .../druid/kafka_storage_handler.q.out           | 244 +++++++++----------
 16 files changed, 242 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/README.md
----------------------------------------------------------------------
diff --git a/kafka-handler/README.md b/kafka-handler/README.md
index 706c77a..11b893c 100644
--- a/kafka-handler/README.md
+++ b/kafka-handler/README.md
@@ -126,8 +126,8 @@ left join wiki_kafka_hive as future_activity on
 | hive.kafka.poll.timeout.ms          | Parameter indicating Kafka Consumer poll timeout period in millis.  FYI this is independent from internal Kafka consumer timeouts. | No        | 5000 (5 Seconds)                        |
 | hive.kafka.max.retries              | Number of retries for Kafka metadata fetch operations.                                                                             | No        | 6                                       |
 | hive.kafka.metadata.poll.timeout.ms | Number of milliseconds before consumer timeout on fetching Kafka metadata.                                                         | No        | 30000 (30 Seconds)                      |
-| kafka.write.semantic                | Writer semantic, allowed values (BEST_EFFORT, AT_LEAST_ONCE, EXACTLY_ONCE)                                                         | No        | AT_LEAST_ONCE                           |
-| hive.kafka.optimistic.commit        | Boolean value indicate the if the producer should commit during task or delegate the commit to HS2.                                | No        | true                                    |
+| kafka.write.semantic                | Writer semantics, allowed values (AT_LEAST_ONCE, EXACTLY_ONCE)                                                         | No        | AT_LEAST_ONCE                           |
+
 
 ### Setting Extra Consumer/Producer properties.
 User can inject custom Kafka consumer/producer properties via the Table properties.
@@ -213,5 +213,5 @@ Then insert data into the table. Keep in mind that Kafka is an append only, thus
 ```sql
 insert into table moving_avg_wiki_kafka_hive select `channel`, `namespace`, `page`, `timestamp`, 
 avg(delta) over (order by `timestamp` asc rows between  60 preceding and current row) as avg_delta, 
-null as `__key`, null as `__partition`, -1, -1,-1, -1 from l15min_wiki;
+null as `__key`, null as `__partition`, -1, -1 from l15min_wiki;
 ```

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
index 950f731..1ddda8e 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaOutputFormat.java
@@ -49,18 +49,15 @@ public class KafkaOutputFormat implements HiveOutputFormat<NullWritable, KafkaWr
       Properties tableProperties,
       Progressable progress) {
     final String topic = jc.get(KafkaTableProperties.HIVE_KAFKA_TOPIC.getName());
-    final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), true);
+    final Boolean optimisticCommit = jc.getBoolean(KafkaTableProperties.HIVE_KAFKA_OPTIMISTIC_COMMIT.getName(), false);
     final WriteSemantic
         writeSemantic =
         WriteSemantic.valueOf(jc.get(KafkaTableProperties.WRITE_SEMANTIC_PROPERTY.getName()));
     final Properties producerProperties = KafkaUtils.producerProperties(jc);
     final FileSinkOperator.RecordWriter recordWriter;
     switch (writeSemantic) {
-    case BEST_EFFORT:
-      recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), false, producerProperties);
-      break;
     case AT_LEAST_ONCE:
-      recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), true, producerProperties);
+      recordWriter = new SimpleKafkaWriter(topic, Utilities.getTaskId(jc), producerProperties);
       break;
     case EXACTLY_ONCE:
       FileSystem fs;
@@ -99,11 +96,6 @@ public class KafkaOutputFormat implements HiveOutputFormat<NullWritable, KafkaWr
    */
   enum WriteSemantic {
     /**
-     * Best effort delivery with no guarantees at all, user can set Producer properties as they wish,
-     * will carry on when possible unless it is a fatal exception.
-     */
-    BEST_EFFORT,
-    /**
      * Deliver all the record at least once unless the job fails.
      * Therefore duplicates can be introduced due to lost ACKs or Tasks retries.
      * Currently this is the default.

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
index 746de61..7f8353c 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordReader.java
@@ -50,8 +50,6 @@ import java.util.Properties;
   private long consumedRecords = 0L;
   private long readBytes = 0L;
   private volatile boolean started = false;
-  private long startOffset = -1L;
-  private long endOffset = Long.MAX_VALUE;
 
   @SuppressWarnings("WeakerAccess") public KafkaRecordReader() {
   }
@@ -75,13 +73,11 @@ import java.util.Properties;
   private synchronized void initialize(KafkaInputSplit inputSplit, Configuration jobConf) {
     if (!started) {
       this.config = jobConf;
-      startOffset = inputSplit.getStartOffset();
-      endOffset = inputSplit.getEndOffset();
+      long startOffset = inputSplit.getStartOffset();
+      long endOffset = inputSplit.getEndOffset();
       TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
       Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
-          "Start [%s] has to be positive and less or equal than End [%s]",
-          startOffset,
-          endOffset);
+          "Start [%s] has to be positive and Less than or equal to End [%s]", startOffset, endOffset);
       totalNumberRecords += endOffset - startOffset;
       initConsumer();
       long
@@ -103,7 +99,7 @@ import java.util.Properties;
   @Override public boolean next(NullWritable nullWritable, KafkaWritable bytesWritable) {
     if (started && recordsCursor.hasNext()) {
       ConsumerRecord<byte[], byte[]> record = recordsCursor.next();
-      bytesWritable.set(record, startOffset, endOffset);
+      bytesWritable.set(record);
       consumedRecords += 1;
       readBytes += record.serializedValueSize();
       return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
index 51cfa24..6b2ca10 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java
@@ -360,12 +360,12 @@ import java.util.stream.Collectors;
   }
 
   private static class TextBytesConverter implements BytesConverter<Text> {
-    Text text = new Text();
+    final private Text text = new Text();
     @Override public byte[] getBytes(Text writable) {
       //@TODO  There is no reason to decode then encode the string to bytes really
       //@FIXME this issue with CTRL-CHAR ^0 added by Text at the end of string and Json serd does not like that.
       try {
-        return writable.decode(writable.getBytes(), 0, writable.getLength()).getBytes(Charset.forName("UTF-8"));
+        return Text.decode(writable.getBytes(), 0, writable.getLength()).getBytes(Charset.forName("UTF-8"));
       } catch (CharacterCodingException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
index 0d64cd9..d87f245 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaStorageHandler.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -172,6 +173,9 @@ import java.util.function.Predicate;
     properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
     properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
     properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG, Utilities.getTaskId(getConf()));
+    if (UserGroupInformation.isSecurityEnabled()) {
+      KafkaUtils.addKerberosJaasConf(getConf(), properties);
+    }
     table.getParameters()
         .entrySet()
         .stream()
@@ -197,6 +201,9 @@ import java.util.function.Predicate;
     properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
     properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
     properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokers);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      KafkaUtils.addKerberosJaasConf(getConf(), properties);
+    }
     table.getParameters()
         .entrySet()
         .stream()

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
index 2e1f6fa..a4ad01a 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaTableProperties.java
@@ -49,6 +49,7 @@ enum KafkaTableProperties {
    * {@link KafkaOutputFormat.WriteSemantic}.
    */
   WRITE_SEMANTIC_PROPERTY("kafka.write.semantic", KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE.name()),
+
   /**
    * Table property that indicates if we should commit within the task or delay it to the Metadata Hook Commit call.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
index 6ae9c8d..81252c5 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaUtils.java
@@ -24,8 +24,11 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -40,6 +43,8 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -55,6 +60,9 @@ import java.util.stream.Collectors;
  * Utils class for Kafka Storage handler plus some Constants.
  */
 final class KafkaUtils {
+  private final static Logger log = LoggerFactory.getLogger(KafkaUtils.class);
+  private static final String JAAS_TEMPLATE = "com.sun.security.auth.module.Krb5LoginModule required "
+      + "useKeyTab=true storeKey=true keyTab=\"%s\" principal=\"%s\";";
 
   private KafkaUtils() {
   }
@@ -103,6 +111,10 @@ final class KafkaUtils {
     props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
     props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
     props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    //case Kerberos is On
+    if (UserGroupInformation.isSecurityEnabled()) {
+      addKerberosJaasConf(configuration, props);
+    }
     // user can always override stuff
     props.putAll(extractExtraProperties(configuration, CONSUMER_CONFIGURATION_PREFIX));
     return props;
@@ -131,18 +143,21 @@ final class KafkaUtils {
           + KafkaTableProperties.HIVE_KAFKA_BOOTSTRAP_SERVERS.getName());
     }
     properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerEndPoint);
+    //case Kerberos is On
+    if (UserGroupInformation.isSecurityEnabled()) {
+      addKerberosJaasConf(configuration, properties);
+    }
+
     // user can always override stuff
     properties.putAll(extractExtraProperties(configuration, PRODUCER_CONFIGURATION_PREFIX));
     String taskId = configuration.get("mapred.task.id", null);
     properties.setProperty(CommonClientConfigs.CLIENT_ID_CONFIG,
         taskId == null ? "random_" + UUID.randomUUID().toString() : taskId);
     switch (writeSemantic) {
-    case BEST_EFFORT:
-      break;
     case AT_LEAST_ONCE:
       properties.setProperty(ProducerConfig.RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
       //The number of acknowledgments the producer requires the leader to have received before considering a request as
-      // complete, all means from all replicas.
+      //complete. Here all means from all replicas.
       properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
       break;
     case EXACTLY_ONCE:
@@ -252,4 +267,37 @@ final class KafkaUtils {
     return id;
   }
 
+  /**
+   * Helper method that add Kerberos Jaas configs to the properties.
+   * @param configuration Hive config containing kerberos key and principal
+   * @param props properties to be populated
+   */
+  static void addKerberosJaasConf(Configuration configuration, Properties props) {
+    //based on this https://kafka.apache.org/documentation/#security_jaas_client
+    props.setProperty("security.protocol", "SASL_PLAINTEXT");
+    props.setProperty("sasl.mechanism", "GSSAPI");
+    props.setProperty("sasl.kerberos.service.name", "kafka");
+
+    //Construct the principal/keytab
+    String principalHost = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
+    String keyTab = HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+    // back to use LLAP keys if HS2 conf are not set or visible for the Task.
+    if (principalHost == null || principalHost.isEmpty() || keyTab == null || keyTab.isEmpty()) {
+      keyTab = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_FS_KERBEROS_KEYTAB_FILE);
+      principalHost = HiveConf.getVar(configuration, HiveConf.ConfVars.LLAP_FS_KERBEROS_PRINCIPAL);
+    }
+
+    String principal;
+    try {
+      principal = SecurityUtil.getServerPrincipal(principalHost, "0.0.0.0");
+    } catch (IOException e) {
+      log.error("Can not construct kerberos principal", e);
+      throw new RuntimeException(e);
+    }
+    final String jaasConf = String.format(JAAS_TEMPLATE, keyTab, principal);
+    props.setProperty("sasl.jaas.config", jaasConf);
+    log.info("Kafka client running with following JAAS = [{}]", jaasConf);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
index 681b666..ccf413b 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaWritable.java
@@ -34,8 +34,7 @@ import java.util.Objects;
 /**
  * Writable implementation of Kafka ConsumerRecord.
  * Serialized in the form:
- * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) |
- * {@code startOffset} (long) | {@code endOffset} (long) | {@code value.size()} (int) |
+ * {@code timestamp} long| {@code partition} (int) | {@code offset} (long) | {@code value.size()} (int) |
  * {@code value} (byte []) | {@code recordKey.size()}| {@code recordKey (byte [])}
  */
 public class KafkaWritable implements Writable {
@@ -46,43 +45,24 @@ public class KafkaWritable implements Writable {
   private byte[] value;
   private byte[] recordKey;
 
-  /**
-   * Fist offset given by the input split used to pull the event {@link KafkaInputSplit#getStartOffset()}.
-   */
-  private long startOffset;
-  /**
-   * Last Offset given by the input split used to pull the event {@link KafkaInputSplit#getEndOffset()}.
-   */
-  private long endOffset;
-
-  void set(ConsumerRecord<byte[], byte[]> consumerRecord, long startOffset, long endOffset) {
+  void set(ConsumerRecord<byte[], byte[]> consumerRecord) {
     this.partition = consumerRecord.partition();
     this.timestamp = consumerRecord.timestamp();
     this.offset = consumerRecord.offset();
     this.value = consumerRecord.value();
     this.recordKey = consumerRecord.key();
-    this.startOffset = startOffset;
-    this.endOffset = endOffset;
   }
 
-  KafkaWritable(int partition,
-      long offset,
-      long timestamp,
-      byte[] value,
-      long startOffset,
-      long endOffset,
-      @Nullable byte[] recordKey) {
+  KafkaWritable(int partition, long offset, long timestamp, byte[] value, @Nullable byte[] recordKey) {
     this.partition = partition;
     this.offset = offset;
     this.timestamp = timestamp;
     this.value = value;
     this.recordKey = recordKey;
-    this.startOffset = startOffset;
-    this.endOffset = endOffset;
   }
 
   KafkaWritable(int partition, long timestamp, byte[] value, @Nullable byte[] recordKey) {
-    this(partition, -1, timestamp, value, -1, -1, recordKey);
+    this(partition, -1, timestamp, value, recordKey);
   }
 
   @SuppressWarnings("WeakerAccess") public KafkaWritable() {
@@ -92,8 +72,6 @@ public class KafkaWritable implements Writable {
     dataOutput.writeLong(timestamp);
     dataOutput.writeInt(partition);
     dataOutput.writeLong(offset);
-    dataOutput.writeLong(startOffset);
-    dataOutput.writeLong(endOffset);
     dataOutput.writeInt(value.length);
     dataOutput.write(value);
     if (recordKey != null) {
@@ -108,8 +86,6 @@ public class KafkaWritable implements Writable {
     timestamp = dataInput.readLong();
     partition = dataInput.readInt();
     offset = dataInput.readLong();
-    startOffset = dataInput.readLong();
-    endOffset = dataInput.readLong();
     int dataSize = dataInput.readInt();
     if (dataSize > 0) {
       value = new byte[dataSize];
@@ -142,14 +118,6 @@ public class KafkaWritable implements Writable {
     return value;
   }
 
-  @SuppressWarnings("WeakerAccess") long getStartOffset() {
-    return startOffset;
-  }
-
-  @SuppressWarnings("WeakerAccess") long getEndOffset() {
-    return endOffset;
-  }
-
   @Nullable byte[] getRecordKey() {
     return recordKey;
   }
@@ -164,15 +132,13 @@ public class KafkaWritable implements Writable {
     KafkaWritable writable = (KafkaWritable) o;
     return partition == writable.partition
         && offset == writable.offset
-        && startOffset == writable.startOffset
-        && endOffset == writable.endOffset
         && timestamp == writable.timestamp
         && Arrays.equals(value, writable.value)
         && Arrays.equals(recordKey, writable.recordKey);
   }
 
   @Override public int hashCode() {
-    int result = Objects.hash(partition, offset, startOffset, endOffset, timestamp);
+    int result = Objects.hash(partition, offset, timestamp);
     result = 31 * result + Arrays.hashCode(value);
     result = 31 * result + Arrays.hashCode(recordKey);
     return result;
@@ -184,10 +150,6 @@ public class KafkaWritable implements Writable {
         + partition
         + ", offset="
         + offset
-        + ", startOffset="
-        + startOffset
-        + ", endOffset="
-        + endOffset
         + ", timestamp="
         + timestamp
         + ", value="
@@ -207,10 +169,6 @@ public class KafkaWritable implements Writable {
       return new LongWritable(getTimestamp());
     case KEY:
       return getRecordKey() == null ? null : new BytesWritable(getRecordKey());
-    case START_OFFSET:
-      return new LongWritable(getStartOffset());
-    case END_OFFSET:
-      return new LongWritable(getEndOffset());
     default:
       throw new IllegalArgumentException("Unknown metadata column [" + metadataColumn.getName() + "]");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
index 60e1aea..15d5340 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/MetadataColumn.java
@@ -60,24 +60,14 @@ enum MetadataColumn {
   /**
    * Record Timestamp column name, added as extra meta column of type long.
    */
-  TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo),
-  /**
-   * Start offset given by the input split, this will reflect the actual start of TP or start given by split pruner.
-   */
-  // @TODO To be removed next PR it is here to make review easy
-  START_OFFSET("__start_offset", TypeInfoFactory.longTypeInfo),
-  /**
-   * End offset given by input split at run time.
-   */
-  // @TODO To be removed next PR it is here to make review easy
-  END_OFFSET("__end_offset", TypeInfoFactory.longTypeInfo);
+  TIMESTAMP("__timestamp", TypeInfoFactory.longTypeInfo);
 
   /**
    * Kafka metadata columns list that indicates the order of appearance for each column in final row.
    */
   private static final List<MetadataColumn>
       KAFKA_METADATA_COLUMNS =
-      Arrays.asList(KEY, PARTITION, OFFSET, TIMESTAMP, START_OFFSET, END_OFFSET);
+      Arrays.asList(KEY, PARTITION, OFFSET, TIMESTAMP);
 
   static final List<ObjectInspector>
       KAFKA_METADATA_INSPECTORS =

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
index c95bdb0..678e190 100644
--- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
+++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/SimpleKafkaWriter.java
@@ -63,7 +63,7 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
 
   private final String topic;
   private final String writerId;
-  private final KafkaOutputFormat.WriteSemantic writeSemantic;
+  private final KafkaOutputFormat.WriteSemantic writeSemantic = KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE;;
   private final KafkaProducer<byte[], byte[]> producer;
   private final Callback callback;
   private final AtomicReference<Exception> sendExceptionRef = new AtomicReference<>();
@@ -73,12 +73,9 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
   /**
    * @param topic Kafka Topic.
    * @param writerId Writer Id use for logging.
-   * @param atLeastOnce true if the desired delivery semantic is at least once.
    * @param properties Kafka Producer properties.
    */
-  SimpleKafkaWriter(String topic, @Nullable String writerId, boolean atLeastOnce, Properties properties) {
-    this.writeSemantic =
-        atLeastOnce ? KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE : KafkaOutputFormat.WriteSemantic.BEST_EFFORT;
+  SimpleKafkaWriter(String topic, @Nullable String writerId, Properties properties) {
     this.writerId = writerId == null ? UUID.randomUUID().toString() : writerId;
     this.topic = Preconditions.checkNotNull(topic, "Topic can not be null");
     Preconditions.checkState(properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
@@ -88,21 +85,13 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
     this.callback = (metadata, exception) -> {
       if (exception != null) {
         lostRecords.getAndIncrement();
-        switch (writeSemantic) {
-        case BEST_EFFORT:
-          LOG.warn(ACTION_CARRY_ON, getWriterId(), topic, writeSemantic);
-          break;
-        case AT_LEAST_ONCE:
-          LOG.error(ACTION_ABORT, getWriterId(), topic, writeSemantic, exception.getMessage());
-          sendExceptionRef.compareAndSet(null, exception);
-          break;
-        default:
-              throw new IllegalArgumentException("Unsupported delivery semantic " + writeSemantic);
-        }
+        LOG.error(ACTION_ABORT, getWriterId(), topic, writeSemantic, exception.getMessage());
+        sendExceptionRef.compareAndSet(null, exception);
       }
     };
     LOG.info("Starting WriterId [{}], Delivery Semantic [{}], Target Kafka Topic [{}]",
-        writerId, writeSemantic,
+        writerId,
+        writeSemantic,
         topic);
   }
 
@@ -126,12 +115,8 @@ class SimpleKafkaWriter implements FileSinkOperator.RecordWriter, RecordWriter<B
       LOG.error(String.format(ABORT_MSG, writerId, kafkaException.getMessage(), topic, -1L));
       sendExceptionRef.compareAndSet(null, kafkaException);
     } else {
-      if (writeSemantic == KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE) {
-        LOG.error(ACTION_ABORT, writerId, topic, writeSemantic, kafkaException.getMessage());
-        sendExceptionRef.compareAndSet(null, kafkaException);
-      } else {
-        LOG.warn(ACTION_CARRY_ON, writerId, topic, writeSemantic);
-      }
+      LOG.error(ACTION_ABORT, writerId, topic, writeSemantic, kafkaException.getMessage());
+      sendExceptionRef.compareAndSet(null, kafkaException);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
index 3d3f598..ff345f9 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java
@@ -166,10 +166,7 @@ import java.util.stream.IntStream;
             .map((consumerRecord) -> new KafkaWritable(consumerRecord.partition(),
                 consumerRecord.offset(),
                 consumerRecord.timestamp(),
-                consumerRecord.value(),
-                50L,
-                100L,
-                consumerRecord.key()))
+                consumerRecord.value(), consumerRecord.key()))
             .collect(Collectors.toList());
     KafkaRecordReader recordReader = new KafkaRecordReader();
     TaskAttemptContext context = new TaskAttemptContextImpl(this.conf, new TaskAttemptID());

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
index 8aebb92..640b24e 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaUtilsTest.java
@@ -67,9 +67,7 @@ public class KafkaUtilsTest {
   @Test public void testMetadataEnumLookupMapper() {
     int partition = 1;
     long offset = 5L;
-    long ts = System.currentTimeMillis();
-    long startOffset = 0L;
-    long endOffset = 200L;
+    long ts = 1000000L;
     byte[] value = "value".getBytes();
     byte[] key = "key".getBytes();
     // ORDER MATTERS here.
@@ -78,10 +76,8 @@ public class KafkaUtilsTest {
         Arrays.asList(new BytesWritable(key),
             new IntWritable(partition),
             new LongWritable(offset),
-            new LongWritable(ts),
-            new LongWritable(startOffset),
-            new LongWritable(endOffset));
-    KafkaWritable kafkaWritable = new KafkaWritable(partition, offset, ts, value, startOffset, endOffset, key);
+            new LongWritable(ts));
+    KafkaWritable kafkaWritable = new KafkaWritable(partition, offset, ts, value, key);
 
     List<Writable>
         actual =

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java
index 45bf791..73d185e 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaWritableTest.java
@@ -42,10 +42,7 @@ import java.util.Arrays;
         new KafkaWritable(record.partition(),
             record.offset(),
             record.timestamp(),
-            record.value(),
-            0L,
-            100L,
-            null);
+            record.value(), null);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream w = new DataOutputStream(baos);
     kafkaWritable.write(w);
@@ -65,10 +62,7 @@ import java.util.Arrays;
         new KafkaWritable(record.partition(),
             record.offset(),
             record.timestamp(),
-            record.value(),
-            0L,
-            100L,
-            "thisKey".getBytes());
+            record.value(), "thisKey".getBytes());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream w = new DataOutputStream(baos);
     kafkaWritable.write(w);
@@ -85,10 +79,7 @@ import java.util.Arrays;
     KafkaWritable kafkaWritable = new KafkaWritable(5,
         1000L,
         1L,
-        "value".getBytes(),
-        0L,
-        10000L,
-        "key".getBytes());
+        "value".getBytes(), "key".getBytes());
     Arrays.stream(MetadataColumn.values()).forEach(kafkaWritable::getHiveWritable);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java
----------------------------------------------------------------------
diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java
index d8168e0..8a9bbc7 100644
--- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java
+++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/SimpleKafkaWriterTest.java
@@ -71,8 +71,7 @@ import java.util.stream.IntStream;
   }
 
   @Parameterized.Parameters public static Iterable<Object[]> data() {
-    return Arrays.asList(new Object[][] {{KafkaOutputFormat.WriteSemantic.BEST_EFFORT},
-        {KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE}});
+    return Arrays.asList(new Object[][] {{KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE}});
   }
 
   @BeforeClass public static void setupCluster() throws Throwable {
@@ -110,9 +109,7 @@ import java.util.stream.IntStream;
 
   @Test(expected = IllegalStateException.class) public void testMissingBrokerString() {
     new SimpleKafkaWriter("t",
-        null,
-        writeSemantic.equals(KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE),
-        new Properties());
+        null, new Properties());
   }
 
   @Test public void testCheckWriterId() {
@@ -121,9 +118,7 @@ import java.util.stream.IntStream;
     SimpleKafkaWriter
         writer =
         new SimpleKafkaWriter("t",
-            null,
-            writeSemantic.equals(KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE),
-            properties);
+            null, properties);
     Assert.assertNotNull(writer.getWriterId());
   }
 
@@ -135,12 +130,7 @@ import java.util.stream.IntStream;
     properties.setProperty("metadata.max.age.ms", "100");
     properties.setProperty("max.block.ms", "1000");
     KafkaWritable record = new KafkaWritable(-1, -1, "value".getBytes(), null);
-    SimpleKafkaWriter writer = new SimpleKafkaWriter("t", null, false, properties);
-    writer.write(record);
-    writer.close(false);
-    Assert.assertEquals("Expect sent records not matching", 1, writer.getSentRecords());
-    Assert.assertEquals("Expect lost records is not matching", 1, writer.getLostRecords());
-    writer = new SimpleKafkaWriter("t", null, true, properties);
+    SimpleKafkaWriter writer = new SimpleKafkaWriter("t", null, properties);
     Exception exception = null;
     try {
       writer.write(record);
@@ -160,9 +150,7 @@ import java.util.stream.IntStream;
     SimpleKafkaWriter
         writer =
         new SimpleKafkaWriter(topic,
-            null,
-            writeSemantic.equals(KafkaOutputFormat.WriteSemantic.AT_LEAST_ONCE),
-            properties);
+            null, properties);
     RECORDS_WRITABLES.forEach(kafkaRecordWritable -> {
       try {
         writer.write(kafkaRecordWritable);

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/ql/src/test/queries/clientpositive/kafka_storage_handler.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/kafka_storage_handler.q b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
index 595f032..e6cd276 100644
--- a/ql/src/test/queries/clientpositive/kafka_storage_handler.q
+++ b/ql/src/test/queries/clientpositive/kafka_storage_handler.q
@@ -14,32 +14,32 @@ TBLPROPERTIES
 
 DESCRIBE EXTENDED kafka_table;
 
-Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table;
 
 Select count(*) FROM kafka_table;
 
-Select `__partition`, `__offset`,`__start_offset`,`__end_offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where `__timestamp` > 1533960760123;
-Select `__partition`, `__offset` ,`__start_offset`,`__end_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where `__timestamp` > 533960760123;
 
-Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR
 `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0);
 
-Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5;
+Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5;
 
-Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5;
+Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5;
 
-Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5;
+Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5;
 
 -- Timestamp filter
 
-Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user`  from kafka_table where
+Select `__partition`, `__offset`, `user`  from kafka_table where
 `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS) ;
 
 -- non existing partition
@@ -232,7 +232,8 @@ TBLPROPERTIES
 describe extended wiki_kafka_avro_table;
 
 
-select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table;
+select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`,
+ `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table;
 
 select count(*) from wiki_kafka_avro_table;
 
@@ -241,7 +242,7 @@ select count(distinct `user`) from  wiki_kafka_avro_table;
 select sum(deltabucket), min(commentlength) from wiki_kafka_avro_table;
 
 select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long,
-`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`,
+`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`,
 `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090;
 
 
@@ -255,11 +256,11 @@ TBLPROPERTIES
 "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe")
 ;
 
-insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test1',5, 4.999,'key',null ,-1,1536449552290,-1,null );
+insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test1',5, 4.999,'key',null ,-1,1536449552290);
 
-insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test2',15, 14.9996666, null ,null ,-1,1536449552285,-1,-1 );
+insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test2',15, 14.9996666, null ,null ,-1,1536449552285);
 
 select * from kafka_table_insert;
 
@@ -268,10 +269,11 @@ insert into table wiki_kafka_avro_table select
 isrobot as isrobot, channel as channel,`timestamp` as `timestamp`,  flags as flags,  isunpatrolled as isunpatrolled, page as page,
 diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor,
 delta as delta, isanonymous as isanonymous, `user` as `user`,  deltabucket as detlabucket, deleted as deleted, namespace as namespace,
-`__key`, `__partition`, -1 as `__offset`,`__timestamp`, -1 as `__start_offset`, -1 as `__end_offset`
+`__key`, `__partition`, -1 as `__offset`,`__timestamp`
 from wiki_kafka_avro_table;
 
-select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table;
+select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`,
+`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table;
 
 select `__key`, count(1)  FROM wiki_kafka_avro_table group by `__key` order by `__key`;
 
@@ -287,12 +289,12 @@ TBLPROPERTIES
 "kafka.serde.class"="org.apache.hadoop.hive.serde2.OpenCSVSerde");
 
 ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.optimistic.commit"="false", "kafka.write.semantic"="EXACTLY_ONCE");
-insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp`, -1, -1 from kafka_table_insert;
+insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert;
 
-insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291,-1,null );
+insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291);
 
-insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284,-1,-1 );
+insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284);
 
 select * from kafka_table_csv;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/dcaeeb47/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
index 73f0f29..8ea2aa9 100644
--- a/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
+++ b/ql/src/test/results/clientpositive/druid/kafka_storage_handler.q.out
@@ -48,32 +48,30 @@ __key               	binary              	from deserializer
 __partition         	int                 	from deserializer   
 __offset            	bigint              	from deserializer   
 __timestamp         	bigint              	from deserializer   
-__start_offset      	bigint              	from deserializer   
-__end_offset        	bigint              	from deserializer   
 	 	 
 #### A masked pattern was here ####
 StorageHandlerInfo	 	 
 Partition(topic = test-topic, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [10]]	 	 
-PREHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+PREHOOK: query: Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition` ,`__start_offset`,`__end_offset`, `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+POSTHOOK: query: Select `__partition` , `__offset`,`__key`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta FROM kafka_table
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0	0	10	0	key	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
-0	0	10	1	key	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	0	10	2	key	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
-0	0	10	3	key	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-0	0	10	4	key	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
-0	0	10	5	key	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
-0	0	10	6	key	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	0	10	7	key	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
-0	0	10	8	key	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-0	0	10	9	key	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+0	0	key	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
+0	1	key	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	2	key	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
+0	3	key	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+0	4	key	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+0	5	key	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
+0	6	key	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	7	key	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
+0	8	key	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+0	9	key	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
 PREHOOK: query: Select count(*) FROM kafka_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
@@ -83,121 +81,121 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
 10
-PREHOOK: query: Select `__partition`, `__offset`,`__start_offset`,`__end_offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+PREHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where `__timestamp` > 1533960760123
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`, `__offset`,`__start_offset`,`__end_offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+POSTHOOK: query: Select `__partition`, `__offset`, `__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where `__timestamp` > 1533960760123
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0	0	0	10	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
-0	1	0	10	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	2	0	10	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
-0	3	0	10	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-0	4	0	10	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
-0	5	0	10	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
-0	6	0	10	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	7	0	10	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
-0	8	0	10	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-0	9	0	10	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
-PREHOOK: query: Select `__partition`, `__offset` ,`__start_offset`,`__end_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+0	0	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
+0	1	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	2	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
+0	3	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+0	4	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+0	5	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
+0	6	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	7	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
+0	8	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+0	9	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+PREHOOK: query: Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where `__timestamp` > 533960760123
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`, `__offset` ,`__start_offset`,`__end_offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+POSTHOOK: query: Select `__partition`, `__offset` ,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where `__timestamp` > 533960760123
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0	0	0	10	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
-0	1	0	10	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	2	0	10	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
-0	3	0	10	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-0	4	0	10	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
-0	5	0	10	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
-0	6	0	10	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	7	0	10	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
-0	8	0	10	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-0	9	0	10	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
-PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+0	0	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
+0	1	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	2	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
+0	3	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+0	4	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+0	5	NULL	Gypsy Danger	nuclear	en	United States	North America	article	true	true	false	false	57	200	-143
+0	6	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	7	NULL	Cherno Alpha	masterYi	ru	Russia	Asia	article	true	false	false	true	123	12	111
+0	8	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+0	9	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+PREHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR
 `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
+POSTHOOK: query: Select `__partition`, `__offset`,`__time`, `page`, `user`, `language`, `country`,`continent`, `namespace`, `newPage` ,
 `unpatrolled` , `anonymous` , `robot` , added , deleted , delta
 from kafka_table where (`__offset` > 7 and `__partition` = 0 and `__offset` <9 ) OR
 `__offset` = 4 and `__partition` = 0 OR (`__offset` <= 1 and `__partition` = 0 and `__offset` > 0)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0	1	9	1	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
-0	1	9	4	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
-0	1	9	8	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
-PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
+0	1	NULL	Striker Eureka	speed	en	Australia	Australia	wikipedia	true	false	false	true	459	129	330
+0	4	NULL	Coyote Tango	stringer	ja	Japan	Asia	wikipedia	false	true	false	true	1	10	-9
+0	8	NULL	Crimson Typhoon	triplets	zh	China	Asia	wikipedia	false	true	false	true	905	5	900
+PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
+POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` = 5
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-key	0	5	6	5	NULL	Gypsy Danger	nuclear
-PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
+key	0	5	NULL	Gypsy Danger	nuclear
+PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
+POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` < 5
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-key	0	0	5	0	NULL	Gypsy Danger	nuclear
-key	0	0	5	1	NULL	Striker Eureka	speed
-key	0	0	5	2	NULL	Cherno Alpha	masterYi
-key	0	0	5	3	NULL	Crimson Typhoon	triplets
-key	0	0	5	4	NULL	Coyote Tango	stringer
-PREHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
+key	0	0	NULL	Gypsy Danger	nuclear
+key	0	1	NULL	Striker Eureka	speed
+key	0	2	NULL	Cherno Alpha	masterYi
+key	0	3	NULL	Crimson Typhoon	triplets
+key	0	4	NULL	Coyote Tango	stringer
+PREHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__key`,`__partition`,`__start_offset`,`__end_offset`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
+POSTHOOK: query: Select `__key`,`__partition`, `__offset`,`__time`, `page`, `user` from kafka_table where `__offset` > 5
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-key	0	6	10	6	NULL	Striker Eureka	speed
-key	0	6	10	7	NULL	Cherno Alpha	masterYi
-key	0	6	10	8	NULL	Crimson Typhoon	triplets
-key	0	6	10	9	NULL	Coyote Tango	stringer
-PREHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user`  from kafka_table where
+key	0	6	NULL	Striker Eureka	speed
+key	0	7	NULL	Cherno Alpha	masterYi
+key	0	8	NULL	Crimson Typhoon	triplets
+key	0	9	NULL	Coyote Tango	stringer
+PREHOOK: query: Select `__partition`, `__offset`, `user`  from kafka_table where
 `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: Select `__partition`,`__start_offset`,`__end_offset`, `__offset`, `user`  from kafka_table where
+POSTHOOK: query: Select `__partition`, `__offset`, `user`  from kafka_table where
 `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' HOURS)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-0	0	10	0	nuclear
-0	0	10	1	speed
-0	0	10	2	masterYi
-0	0	10	3	triplets
-0	0	10	4	stringer
-0	0	10	5	nuclear
-0	0	10	6	speed
-0	0	10	7	masterYi
-0	0	10	8	triplets
-0	0	10	9	stringer
+0	0	nuclear
+0	1	speed
+0	2	masterYi
+0	3	triplets
+0	4	stringer
+0	5	nuclear
+0	6	speed
+0	7	masterYi
+0	8	triplets
+0	9	stringer
 PREHOOK: query: Select  count(*) from kafka_table where `__partition` = 1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table
@@ -635,17 +633,17 @@ POSTHOOK: query: SELECT * FROM wiki_kafka_avro_table_1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table_1
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-key-0	0	0	1534736225090	0	11
-key-1	0	1	1534739825090	0	11
-key-2	0	2	1534743425090	0	11
-key-3	0	3	1534747025090	0	11
-key-4	0	4	1534750625090	0	11
-key-5	0	5	1534754225090	0	11
-key-6	0	6	1534757825090	0	11
-key-7	0	7	1534761425090	0	11
-key-8	0	8	1534765025090	0	11
-key-9	0	9	1534768625090	0	11
-key-10	0	10	1534772225090	0	11
+key-0	0	0	1534736225090
+key-1	0	1	1534739825090
+key-2	0	2	1534743425090
+key-3	0	3	1534747025090
+key-4	0	4	1534750625090
+key-5	0	5	1534754225090
+key-6	0	6	1534757825090
+key-7	0	7	1534761425090
+key-8	0	8	1534765025090
+key-9	0	9	1534768625090
+key-10	0	10	1534772225090
 PREHOOK: query: SELECT  COUNT (*) from wiki_kafka_avro_table_1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table_1
@@ -825,17 +823,17 @@ __key               	binary              	from deserializer
 __partition         	int                 	from deserializer   
 __offset            	bigint              	from deserializer   
 __timestamp         	bigint              	from deserializer   
-__start_offset      	bigint              	from deserializer   
-__end_offset        	bigint              	from deserializer   
 	 	 
 #### A masked pattern was here ####
 StorageHandlerInfo	 	 
 Partition(topic = wiki_kafka_avro_table, partition = 0, leader = 1, replicas = [1], isr = [1], offlineReplicas = []) [start offset = [0], end offset = [11]]	 	 
-PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
+PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`,
+ `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
+POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`,
+ `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
@@ -878,23 +876,23 @@ POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
 5522.000000000001	0
 PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long,
-`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`,
+`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`,
 `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
 POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__timestamp` as kafka_record_ts_long,
-`__partition`, `__start_offset`,`__end_offset`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`,
+`__partition`, `__key`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`,
 `isanonymous`, `commentlength` from wiki_kafka_avro_table where `__timestamp` > 1534750625090
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-2018-08-20 08:37:05.09	1534754225090	0	5	11	key-5	5	08/20/2018 01:37:05	test-user-5	page is 500	-5	502.0	true	5
-2018-08-20 09:37:05.09	1534757825090	0	5	11	key-6	6	08/20/2018 02:37:05	test-user-6	page is 600	-6	602.4000000000001	false	6
-2018-08-20 10:37:05.09	1534761425090	0	5	11	key-7	7	08/20/2018 03:37:05	test-user-7	page is 700	-7	702.8000000000001	true	7
-2018-08-20 11:37:05.09	1534765025090	0	5	11	key-8	8	08/20/2018 04:37:05	test-user-8	page is 800	-8	803.2	true	8
-2018-08-20 12:37:05.09	1534768625090	0	5	11	key-9	9	08/20/2018 05:37:05	test-user-9	page is 900	-9	903.6	false	9
-2018-08-20 13:37:05.09	1534772225090	0	5	11	key-10	10	08/20/2018 06:37:05	test-user-10	page is 1000	-10	1004.0	true	10
+2018-08-20 08:37:05.09	1534754225090	0	key-5	5	08/20/2018 01:37:05	test-user-5	page is 500	-5	502.0	true	5
+2018-08-20 09:37:05.09	1534757825090	0	key-6	6	08/20/2018 02:37:05	test-user-6	page is 600	-6	602.4000000000001	false	6
+2018-08-20 10:37:05.09	1534761425090	0	key-7	7	08/20/2018 03:37:05	test-user-7	page is 700	-7	702.8000000000001	true	7
+2018-08-20 11:37:05.09	1534765025090	0	key-8	8	08/20/2018 04:37:05	test-user-8	page is 800	-8	803.2	true	8
+2018-08-20 12:37:05.09	1534768625090	0	key-9	9	08/20/2018 05:37:05	test-user-9	page is 900	-9	903.6	false	9
+2018-08-20 13:37:05.09	1534772225090	0	key-10	10	08/20/2018 06:37:05	test-user-10	page is 1000	-10	1004.0	true	10
 PREHOOK: query: CREATE EXTERNAL TABLE kafka_table_insert
 (c_name string, c_int int, c_float float)
 STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
@@ -917,23 +915,23 @@ TBLPROPERTIES
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@kafka_table_insert
-PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test1',5, 4.999,'key',null ,-1,1536449552290,-1,null )
+PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test1',5, 4.999,'key',null ,-1,1536449552290)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
 PREHOOK: Output: default@kafka_table_insert
-POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test1',5, 4.999,'key',null ,-1,1536449552290,-1,null )
+POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test1',5, 4.999,'key',null ,-1,1536449552290)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@kafka_table_insert
-PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test2',15, 14.9996666, null ,null ,-1,1536449552285,-1,-1 )
+PREHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test2',15, 14.9996666, null ,null ,-1,1536449552285)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
 PREHOOK: Output: default@kafka_table_insert
-POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test2',15, 14.9996666, null ,null ,-1,1536449552285,-1,-1 )
+POSTHOOK: query: insert into table kafka_table_insert (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test2',15, 14.9996666, null ,null ,-1,1536449552285)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@kafka_table_insert
@@ -945,13 +943,13 @@ POSTHOOK: query: select * from kafka_table_insert
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table_insert
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-test1	5	4.999	key	0	0	1536449552290	0	2
-test2	15	14.999666	NULL	0	1	1536449552285	0	2
+test1	5	4.999	key	0	0	1536449552290
+test2	15	14.999666	NULL	0	1	1536449552285
 PREHOOK: query: insert into table wiki_kafka_avro_table select
 isrobot as isrobot, channel as channel,`timestamp` as `timestamp`,  flags as flags,  isunpatrolled as isunpatrolled, page as page,
 diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor,
 delta as delta, isanonymous as isanonymous, `user` as `user`,  deltabucket as detlabucket, deleted as deleted, namespace as namespace,
-`__key`, `__partition`, -1 as `__offset`,`__timestamp`, -1 as `__start_offset`, -1 as `__end_offset`
+`__key`, `__partition`, -1 as `__offset`,`__timestamp`
 from wiki_kafka_avro_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table
@@ -960,16 +958,18 @@ POSTHOOK: query: insert into table wiki_kafka_avro_table select
 isrobot as isrobot, channel as channel,`timestamp` as `timestamp`,  flags as flags,  isunpatrolled as isunpatrolled, page as page,
 diffurl as diffurl, added as added, comment as comment, commentlength as commentlength, isnew as isnew, isminor as isminor,
 delta as delta, isanonymous as isanonymous, `user` as `user`,  deltabucket as detlabucket, deleted as deleted, namespace as namespace,
-`__key`, `__partition`, -1 as `__offset`,`__timestamp`, -1 as `__start_offset`, -1 as `__end_offset`
+`__key`, `__partition`, -1 as `__offset`,`__timestamp`
 from wiki_kafka_avro_table
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: default@wiki_kafka_avro_table
-PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
+PREHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`,
+`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@wiki_kafka_avro_table
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`, `page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
+POSTHOOK: query: select cast ((`__timestamp`/1000) as timestamp) as kafka_record_ts, `__partition`, `__offset`, `timestamp`, `user`,
+`page`, `deleted`, `deltabucket`, `isanonymous`, `commentlength` from wiki_kafka_avro_table
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@wiki_kafka_avro_table
 POSTHOOK: Output: hdfs://### HDFS PATH ###
@@ -1061,31 +1061,31 @@ POSTHOOK: query: ALTER TABLE kafka_table_csv SET TBLPROPERTIES ("hive.kafka.opti
 POSTHOOK: type: ALTERTABLE_PROPERTIES
 POSTHOOK: Input: default@kafka_table_csv
 POSTHOOK: Output: default@kafka_table_csv
-PREHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp`, -1, -1 from kafka_table_insert
+PREHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert
 PREHOOK: type: QUERY
 PREHOOK: Input: default@kafka_table_insert
 PREHOOK: Output: default@kafka_table_csv
-POSTHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp`, -1, -1 from kafka_table_insert
+POSTHOOK: query: insert into table kafka_table_csv select c_name, c_int, c_float, `__key`, `__partition`, -1 as `__offset`, `__timestamp` from kafka_table_insert
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table_insert
 POSTHOOK: Output: default@kafka_table_csv
-PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291,-1,null )
+PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
 PREHOOK: Output: default@kafka_table_csv
-POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291,-1,null )
+POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float,`__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test4',-5, -4.999,'key-2',null ,-1,1536449552291)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@kafka_table_csv
-PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284,-1,-1 )
+PREHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
 PREHOOK: Output: default@kafka_table_csv
-POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`, `__start_offset`, `__end_offset`)
-values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284,-1,-1 )
+POSTHOOK: query: insert into table kafka_table_csv (c_name,c_int, c_float, `__key`, `__partition`, `__offset`, `__timestamp`)
+values ('test5',-15, -14.9996666, 'key-3' ,null ,-1,1536449552284)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@kafka_table_csv
@@ -1097,7 +1097,7 @@ POSTHOOK: query: select * from kafka_table_csv
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@kafka_table_csv
 POSTHOOK: Output: hdfs://### HDFS PATH ###
-test1	5	4.999	key	0	0	1536449552290	0	7
-test2	15	14.999666	NULL	0	1	1536449552285	0	7
-test4	-5	-4.999	key-2	0	3	1536449552291	0	7
-test5	-15	-14.9996666	key-3	0	5	1536449552284	0	7
+test1	5	4.999	key	0	0	1536449552290
+test2	15	14.999666	NULL	0	1	1536449552285
+test4	-5	-4.999	key-2	0	3	1536449552291
+test5	-15	-14.9996666	key-3	0	5	1536449552284