You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/15 22:40:29 UTC

[09/50] [abbrv] hive git commit: HIVE-19173 : Add Storage Handler runtime information as part of DESCRIBE EXTENDED (Nishant Bangarwa via Ashutosh Chauhan)

HIVE-19173 : Add Storage Handler runtime information as part of DESCRIBE EXTENDED (Nishant Bangarwa via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa040c5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa040c5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa040c5b

Branch: refs/heads/branch-3.0.0
Commit: aa040c5bfcea2257b4aa89f39832a7d6198a43a0
Parents: b135724
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Fri Apr 13 09:43:00 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 8 16:05:54 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  |  70 +++++-
 .../hive/druid/DruidStorageHandlerInfo.java     |  72 ++++++
 .../hive/druid/json/KafkaSupervisorReport.java  | 231 +++++++++++++++++++
 .../hadoop/hive/druid/json/TaskReportData.java  | 125 ++++++++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   7 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  62 +++--
 .../hive/ql/metadata/HiveStorageHandler.java    |  17 +-
 .../hive/ql/metadata/StorageHandlerInfo.java    |  38 +++
 .../formatting/JsonMetaDataFormatter.java       |   6 +-
 .../metadata/formatting/MetaDataFormatter.java  |   4 +-
 .../formatting/TextMetaDataFormatter.java       |  11 +-
 .../clientpositive/druidkafkamini_basic.q       |   6 +-
 .../druid/druidkafkamini_basic.q.out            |  26 ++-
 13 files changed, 633 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index bc08bd8..3e707e3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -35,6 +35,7 @@ import com.metamx.http.client.HttpClientInit;
 import com.metamx.http.client.Request;
 import com.metamx.http.client.response.StatusResponseHandler;
 import com.metamx.http.client.response.StatusResponseHolder;
+
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionsSpec;
 import io.druid.data.input.impl.InputRowParser;
@@ -59,6 +60,7 @@ import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.storage.hdfs.HdfsDataSegmentPusher;
 import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
 import io.druid.timeline.DataSegment;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
 import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
 import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
 import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig;
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport;
 import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
 import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig;
 import org.apache.hadoop.hive.druid.security.KerberosHttpClient;
@@ -82,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -94,6 +98,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.ShutdownHookManager;
+
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.DateTime;
@@ -116,6 +121,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
+
 import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
 
 /**
@@ -454,7 +461,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
         console.printInfo("Druid Kafka Ingestion Reset successful.");
       } else {
         throw new IOException(String
-            .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+            .format("Unable to reset Kafka Ingestion Druid status [%d] full response [%s]",
                 response.getStatus().getCode(), response.getContent()));
       }
     } catch (Exception e) {
@@ -486,7 +493,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
 
   }
 
-  public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
+  private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
     // Stop Kafka Ingestion first
     final String overlordAddress = Preconditions.checkNotNull(HiveConf
             .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
@@ -512,7 +519,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
         return null;
       } else {
         throw new IOException(String
-            .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+            .format("Unable to fetch Kafka Ingestion Spec from Druid status [%d] full response [%s]",
                 response.getStatus().getCode(), response.getContent()));
       }
     } catch (Exception e) {
@@ -521,6 +528,46 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
   }
 
   /**
+   * Fetches kafka supervisor status report from druid overlod.
+   * @param table
+   * @return kafka supervisor report or null when druid overlord is unreachable.
+   */
+  @Nullable
+  private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) {
+    final String overlordAddress = Preconditions.checkNotNull(HiveConf
+                    .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
+            "Druid Overlord Address is null");
+    String dataSourceName = Preconditions
+            .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
+                    "Druid Datasource name is null");
+    try {
+      StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
+                      new URL(String
+                              .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress,
+                                      dataSourceName))),
+              new StatusResponseHandler(
+                      Charset.forName("UTF-8"))).get(),
+              input -> input instanceof IOException,
+              getMaxRetryCount());
+      if (response.getStatus().equals(HttpResponseStatus.OK)) {
+        return DruidStorageHandlerUtils.JSON_MAPPER
+                .readValue(response.getContent(), KafkaSupervisorReport.class);
+        // Druid Returns 400 Bad Request when not found.
+      } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
+        LOG.info("No Kafka Supervisor found for datasource[%s]", dataSourceName);
+        return null;
+      } else {
+        LOG.error("Unable to fetch Kafka Supervisor status [%d] full response [%s]",
+                        response.getStatus().getCode(), response.getContent());
+        return null;
+      }
+    } catch (Exception e) {
+      LOG.error("Exception while fetching kafka ingestion spec from druid", e);
+      return null;
+    }
+  }
+  
+  /**
    * Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN
    *
    * @param table Hive table
@@ -995,6 +1042,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
       updateKafkaIngestion(table);
     }
   }
+
   private static <T> Boolean getBooleanProperty(Table table, String propertyName) {
     String val = getTableProperty(table, propertyName);
     if (val == null) {
@@ -1057,4 +1105,20 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
   private int getMaxRetryCount() {
     return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
   }
+
+  @Override
+  public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
+    if(isKafkaStreamingTable(table)){
+        KafkaSupervisorReport kafkaSupervisorReport = fetchKafkaSupervisorReport(table);
+        if(kafkaSupervisorReport == null){
+          return DruidStorageHandlerInfo.UNREACHABLE;
+        }
+        return new DruidStorageHandlerInfo(kafkaSupervisorReport);
+    }
+    else
+      // TODO: Currently we do not expose any runtime info for non-streaming tables.
+      // In future extend this add more information regarding table status.
+      // e.g. Total size of segments in druid, loadstatus of table on historical nodes etc.
+      return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java
new file mode 100644
index 0000000..f0e1750
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import io.druid.java.util.common.StringUtils;
+
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
+
+/**
+ * DruidStorageHandlerInfo provides a runtime information for DruidStorageHandler.
+ */
+@SuppressWarnings("serial")
+public class DruidStorageHandlerInfo implements StorageHandlerInfo {
+
+  public static final StorageHandlerInfo UNREACHABLE = new StorageHandlerInfo() {
+    @Override
+    public String formatAsText() {
+      return "Druid Overlord is Unreachable, Runtime Status : unknown";
+    }
+  };
+
+  private final KafkaSupervisorReport kafkaSupervisorReport;
+
+  DruidStorageHandlerInfo(KafkaSupervisorReport kafkaSupervisorReport) {
+    this.kafkaSupervisorReport = kafkaSupervisorReport;
+  }
+
+  @Override
+  public String formatAsText() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Druid Storage Handler Runtime Status for " + kafkaSupervisorReport.getId());
+    sb.append("\n");
+    sb.append("kafkaPartitions=" + kafkaSupervisorReport.getPayload().getPartitions());
+    sb.append("\n");
+    sb.append("activeTasks=" + kafkaSupervisorReport.getPayload().getActiveTasks());
+    sb.append("\n");
+    sb.append("publishingTasks=" + kafkaSupervisorReport.getPayload().getPublishingTasks());
+    if (kafkaSupervisorReport.getPayload().getLatestOffsets() != null) {
+      sb.append("\n");
+      sb.append("latestOffsets=" + kafkaSupervisorReport.getPayload().getLatestOffsets());
+    }
+    if (kafkaSupervisorReport.getPayload().getMinimumLag() != null) {
+      sb.append("\n");
+      sb.append("minimumLag=" + kafkaSupervisorReport.getPayload().getMinimumLag());
+    }
+    if (kafkaSupervisorReport.getPayload().getAggregateLag() != null) {
+      sb.append("\n");
+      sb.append("aggregateLag=" + kafkaSupervisorReport.getPayload().getAggregateLag());
+    }
+    if (kafkaSupervisorReport.getPayload().getOffsetsLastUpdated() != null) {
+      sb.append("\n");
+      sb.append("lastUpdateTime=" + kafkaSupervisorReport.getPayload().getOffsetsLastUpdated());
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java
new file mode 100644
index 0000000..5a6756e
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.json;
+
+import io.druid.guice.annotations.Json;
+import io.druid.indexing.overlord.supervisor.SupervisorReport;
+import io.druid.java.util.common.IAE;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class KafkaSupervisorReport extends SupervisorReport
+{
+  public static class KafkaSupervisorReportPayload
+  {
+    private final String dataSource;
+    private final String topic;
+    private final Integer partitions;
+    private final Integer replicas;
+    private final Long durationSeconds;
+    private final List<TaskReportData> activeTasks;
+    private final List<TaskReportData> publishingTasks;
+    private final Map<Integer, Long> latestOffsets;
+    private final Map<Integer, Long> minimumLag;
+    private final Long aggregateLag;
+    private final DateTime offsetsLastUpdated;
+
+    @JsonCreator
+    public KafkaSupervisorReportPayload(
+        @JsonProperty("dataSource") String dataSource,
+        @JsonProperty("topic") String topic,
+        @JsonProperty("partitions") Integer partitions,
+        @JsonProperty("replicas") Integer replicas,
+        @JsonProperty("durationSeconds") Long durationSeconds,
+        @Nullable @JsonProperty("latestOffsets") Map<Integer, Long> latestOffsets,
+        @Nullable @JsonProperty("minimumLag") Map<Integer, Long> minimumLag,
+        @Nullable @JsonProperty("aggregateLag") Long aggregateLag,
+        @Nullable @JsonProperty("offsetsLastUpdated") DateTime offsetsLastUpdated
+    )
+    {
+      this.dataSource = dataSource;
+      this.topic = topic;
+      this.partitions = partitions;
+      this.replicas = replicas;
+      this.durationSeconds = durationSeconds;
+      this.activeTasks = Lists.newArrayList();
+      this.publishingTasks = Lists.newArrayList();
+      this.latestOffsets = latestOffsets;
+      this.minimumLag = minimumLag;
+      this.aggregateLag = aggregateLag;
+      this.offsetsLastUpdated = offsetsLastUpdated;
+    }
+
+    @JsonProperty
+    public String getDataSource()
+    {
+      return dataSource;
+    }
+
+    @JsonProperty
+    public String getTopic()
+    {
+      return topic;
+    }
+
+    @JsonProperty
+    public Integer getPartitions()
+    {
+      return partitions;
+    }
+
+    @JsonProperty
+    public Integer getReplicas()
+    {
+      return replicas;
+    }
+
+    @JsonProperty
+    public Long getDurationSeconds()
+    {
+      return durationSeconds;
+    }
+
+    @JsonProperty
+    public List<TaskReportData> getActiveTasks()
+    {
+      return activeTasks;
+    }
+
+    @JsonProperty
+    public List<TaskReportData> getPublishingTasks()
+    {
+      return publishingTasks;
+    }
+
+    @JsonProperty
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public Map<Integer, Long> getLatestOffsets()
+    {
+      return latestOffsets;
+    }
+
+    @JsonProperty
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public Map<Integer, Long> getMinimumLag()
+    {
+      return minimumLag;
+    }
+
+    @JsonProperty
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public Long getAggregateLag()
+    {
+      return aggregateLag;
+    }
+
+    @JsonProperty
+    public DateTime getOffsetsLastUpdated()
+    {
+      return offsetsLastUpdated;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "{" +
+             "dataSource='" + dataSource + '\'' +
+             ", topic='" + topic + '\'' +
+             ", partitions=" + partitions +
+             ", replicas=" + replicas +
+             ", durationSeconds=" + durationSeconds +
+             ", active=" + activeTasks +
+             ", publishing=" + publishingTasks +
+             (latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") +
+             (minimumLag != null ? ", minimumLag=" + minimumLag : "") +
+             (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") +
+             (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") +
+             '}';
+    }
+  }
+
+  private final KafkaSupervisorReportPayload payload;
+
+  @JsonCreator
+  public KafkaSupervisorReport(@JsonProperty("id") String id,
+          @JsonProperty("generationTime")DateTime generationTime,
+          @JsonProperty("payload") KafkaSupervisorReportPayload payload){
+    super(id, generationTime);
+    this.payload = payload;
+  }
+
+  public KafkaSupervisorReport(
+      String dataSource,
+      DateTime generationTime,
+      String topic,
+      Integer partitions,
+      Integer replicas,
+      Long durationSeconds,
+      @Nullable Map<Integer, Long> latestOffsets,
+      @Nullable Map<Integer, Long> minimumLag,
+      @Nullable Long aggregateLag,
+      @Nullable DateTime offsetsLastUpdated
+  ) {
+    this(dataSource, generationTime, new KafkaSupervisorReportPayload(
+            dataSource,
+            topic,
+            partitions,
+            replicas,
+            durationSeconds,
+            latestOffsets,
+            minimumLag,
+            aggregateLag,
+            offsetsLastUpdated
+    ));
+  }
+
+  @Override
+  public KafkaSupervisorReportPayload getPayload()
+  {
+    return payload;
+  }
+
+  public void addTask(TaskReportData data)
+  {
+    if (data.getType().equals(TaskReportData.TaskType.ACTIVE)) {
+      payload.activeTasks.add(data);
+    } else if (data.getType().equals(TaskReportData.TaskType.PUBLISHING)) {
+      payload.publishingTasks.add(data);
+    } else {
+      throw new IAE("Unknown task type [%s]", data.getType().name());
+    }
+  }
+
+  @Override
+  public String toString()
+  {
+    return "{" +
+           "id='" + getId() + '\'' +
+           ", generationTime=" + getGenerationTime() +
+           ", payload=" + payload +
+           '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
new file mode 100644
index 0000000..94a3f7f
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class TaskReportData
+{
+  public enum TaskType
+  {
+    ACTIVE, PUBLISHING, UNKNOWN
+  }
+
+  private final String id;
+  private final Map<Integer, Long> startingOffsets;
+  private final DateTime startTime;
+  private final Long remainingSeconds;
+  private final TaskType type;
+  private final Map<Integer, Long> currentOffsets;
+  private final Map<Integer, Long> lag;
+
+  public TaskReportData(
+      String id,
+      @Nullable Map<Integer, Long> startingOffsets,
+      @Nullable Map<Integer, Long> currentOffsets,
+      DateTime startTime,
+      Long remainingSeconds,
+      TaskType type,
+      @Nullable Map<Integer, Long> lag
+  )
+  {
+    this.id = id;
+    this.startingOffsets = startingOffsets;
+    this.currentOffsets = currentOffsets;
+    this.startTime = startTime;
+    this.remainingSeconds = remainingSeconds;
+    this.type = type;
+    this.lag = lag;
+  }
+
+  @JsonProperty
+  public String getId()
+  {
+    return id;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<Integer, Long> getStartingOffsets()
+  {
+    return startingOffsets;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<Integer, Long> getCurrentOffsets()
+  {
+    return currentOffsets;
+  }
+
+  @JsonProperty
+  public DateTime getStartTime()
+  {
+    return startTime;
+  }
+
+  @JsonProperty
+  public Long getRemainingSeconds()
+  {
+    return remainingSeconds;
+  }
+
+  @JsonProperty
+  public TaskType getType()
+  {
+    return type;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<Integer, Long> getLag()
+  {
+    return lag;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "{" +
+           "id='" + id + '\'' +
+           (startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") +
+           (currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") +
+           ", startTime=" + startTime +
+           ", remainingSeconds=" + remainingSeconds +
+           (lag != null ? ", lag=" + lag : "") +
+           '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index b3c95eb..c9c5054 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
 import java.io.BufferedWriter;
@@ -78,7 +77,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
 import org.apache.hadoop.hive.metastore.StatObjectConverter;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -166,6 +164,7 @@ import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
 import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
@@ -3757,6 +3756,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       NotNullConstraint nnInfo = null;
       DefaultConstraint dInfo = null;
       CheckConstraint cInfo = null;
+      StorageHandlerInfo storageHandlerInfo = null;
       if (descTbl.isExt() || descTbl.isFormatted()) {
         pkInfo = db.getPrimaryKeys(tbl.getDbName(), tbl.getTableName());
         fkInfo = db.getForeignKeys(tbl.getDbName(), tbl.getTableName());
@@ -3764,6 +3764,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
         nnInfo = db.getNotNullConstraints(tbl.getDbName(), tbl.getTableName());
         dInfo = db.getDefaultConstraints(tbl.getDbName(), tbl.getTableName());
         cInfo = db.getCheckConstraints(tbl.getDbName(), tbl.getTableName());
+        storageHandlerInfo = db.getStorageHandlerInfo(tbl);
       }
       fixDecimalColumnTypeName(cols);
       // In case the query is served by HiveServer2, don't pad it with spaces,
@@ -3772,7 +3773,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       formatter.describeTable(outStream, colPath, tableName, tbl, part,
           cols, descTbl.isFormatted(), descTbl.isExt(),
           isOutputPadded, colStats,
-          pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo);
+          pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo, storageHandlerInfo);
 
       LOG.debug("DDLTask: written data for {}", tableName);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 3218f96..64b3f83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
 import javax.jdo.JDODataStoreException;
 
 import com.google.common.collect.ImmutableList;
@@ -95,7 +96,6 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;
-import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HdfsUtils;
@@ -4166,29 +4166,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
   private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException {
 
     HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
-        @Override
-        public HiveMetaHook getHook(
-          org.apache.hadoop.hive.metastore.api.Table tbl)
-          throws MetaException {
-
-          try {
-            if (tbl == null) {
-              return null;
-            }
-            HiveStorageHandler storageHandler =
-              HiveUtils.getStorageHandler(conf,
-                tbl.getParameters().get(META_TABLE_STORAGE));
-            if (storageHandler == null) {
-              return null;
-            }
-            return storageHandler.getMetaHook();
-          } catch (HiveException ex) {
-            LOG.error(StringUtils.stringifyException(ex));
-            throw new MetaException(
-              "Failed to load storage handler:  " + ex.getMessage());
-          }
-        }
-      };
+      @Override
+      public HiveMetaHook getHook(
+              org.apache.hadoop.hive.metastore.api.Table tbl)
+              throws MetaException {
+        HiveStorageHandler storageHandler = createStorageHandler(tbl);
+        return storageHandler == null ? null : storageHandler.getMetaHook();
+      }
+    };
 
     if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
       return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded);
@@ -4198,6 +4183,22 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
+  @Nullable
+  private HiveStorageHandler createStorageHandler(org.apache.hadoop.hive.metastore.api.Table tbl) throws MetaException {
+    try {
+      if (tbl == null) {
+        return null;
+      }
+      HiveStorageHandler storageHandler =
+              HiveUtils.getStorageHandler(conf, tbl.getParameters().get(META_TABLE_STORAGE));
+      return storageHandler;
+    } catch (HiveException ex) {
+      LOG.error(StringUtils.stringifyException(ex));
+      throw new MetaException(
+              "Failed to load storage handler:  " + ex.getMessage());
+    }
+  }
+
   public static class SchemaException extends MetaException {
     private static final long serialVersionUID = 1L;
     public SchemaException(String message) {
@@ -5115,4 +5116,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
   }
-};
+
+  @Nullable
+  public StorageHandlerInfo getStorageHandlerInfo(Table table)
+      throws HiveException {
+    try {
+      HiveStorageHandler storageHandler = createStorageHandler(table.getTTable());
+      return storageHandler == null ? null : storageHandler.getStorageHandlerInfo(table.getTTable());
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 99bb9f6..1696243 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -149,7 +152,19 @@ public interface HiveStorageHandler extends Configurable {
    * Called just before submitting MapReduce job.
    *
    * @param tableDesc descriptor for the table being accessed
-   * @param JobConf jobConf for MapReduce job
+   * @param jobConf jobConf for MapReduce job
    */
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
+
+  /**
+   * Used to fetch runtime information about storage handler during DESCRIBE EXTENDED statement
+   *
+   * @param table table definition
+   * @return StorageHandlerInfo containing runtime information about storage handler
+   * OR `null` if the storage handler choose to not provide any runtime information.
+   */
+  public default StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException
+  {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
new file mode 100644
index 0000000..dbc44a6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * StorageHandlerInfo is a marker interface used to provide runtime information associated with a storage handler.
+ */
+public interface StorageHandlerInfo extends Serializable {
+  /**
+   * Called from Describe Extended Statement when Formatter is Text Formatter.
+   * @return Formatted StorageHandlerInfo as String
+   */
+  String formatAsText();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
index cd70eee..c21967c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
 import org.codehaus.jackson.JsonGenerator;
@@ -117,7 +118,7 @@ public class JsonMetaDataFormatter implements MetaDataFormatter {
       boolean isOutputPadded, List<ColumnStatisticsObj> colStats,
       PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo,
       UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo,
-                            CheckConstraint cInfo) throws HiveException {
+      CheckConstraint cInfo, StorageHandlerInfo storageHandlerInfo) throws HiveException {
     MapBuilder builder = MapBuilder.create();
     builder.put("columns", makeColsUnformatted(cols));
 
@@ -146,6 +147,9 @@ public class JsonMetaDataFormatter implements MetaDataFormatter {
       if (cInfo != null && !cInfo.getCheckConstraints().isEmpty()) {
         builder.put("checkConstraintInfo", cInfo);
       }
+      if(storageHandlerInfo != null) {
+        builder.put("storageHandlerInfo", storageHandlerInfo.toString());
+      }
     }
 
     asJson(out, builder.build());

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
index ed2cdd1..d15016c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
 
@@ -91,7 +92,8 @@ public interface MetaDataFormatter {
       boolean isFormatted, boolean isExt,
       boolean isOutputPadded, List<ColumnStatisticsObj> colStats,
       PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo,
-      UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo)
+      UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo,
+      StorageHandlerInfo storageHandlerInfo)
           throws HiveException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
index 63a2969..2529923 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,7 +133,8 @@ class TextMetaDataFormatter implements MetaDataFormatter {
       boolean isFormatted, boolean isExt,
       boolean isOutputPadded, List<ColumnStatisticsObj> colStats,
       PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo,
-      UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo)
+      UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo,
+      StorageHandlerInfo storageHandlerInfo)
         throws HiveException {
     try {
       List<FieldSchema> partCols = tbl.isPartitioned() ? tbl.getPartCols() : null;
@@ -252,6 +254,13 @@ class TextMetaDataFormatter implements MetaDataFormatter {
               outStream.write(terminator);
             }
           }
+
+          if (storageHandlerInfo!= null) {
+            outStream.write(("StorageHandlerInfo").getBytes("UTF-8"));
+            outStream.write(terminator);
+            outStream.write(storageHandlerInfo.formatAsText().getBytes("UTF-8"));
+            outStream.write(terminator);
+          }
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
index f4fd2a6..4c30cdd 100644
--- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
@@ -9,7 +9,7 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la
         "druid.kafka.ingestion.useEarliestOffset" = "true",
         "druid.kafka.ingestion.maxRowsInMemory" = "5",
         "druid.kafka.ingestion.startDelay" = "PT1S",
-        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.taskDuration" = "PT20S",
         "druid.kafka.ingestion.period" = "PT1S"
         );
 
@@ -18,7 +18,7 @@ ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'
 !curl -ss http://localhost:8081/druid/indexer/v1/supervisor;
 
 -- Sleep for some time for ingestion tasks to ingest events
-!sleep 50;
+!sleep 60;
 
 DESCRIBE druid_kafka_test;
 DESCRIBE EXTENDED druid_kafka_test;
@@ -32,7 +32,7 @@ Select page FROM druid_kafka_test order by page;
 ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');
 
 -- Sleep for some time for ingestion tasks to ingest events
-!sleep 50;
+!sleep 60;
 
 DESCRIBE druid_kafka_test;
 DESCRIBE EXTENDED druid_kafka_test;

http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
index 6f553fa..c2cc249 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
@@ -8,7 +8,7 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `
         "druid.kafka.ingestion.useEarliestOffset" = "true",
         "druid.kafka.ingestion.maxRowsInMemory" = "5",
         "druid.kafka.ingestion.startDelay" = "PT1S",
-        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.taskDuration" = "PT20S",
         "druid.kafka.ingestion.period" = "PT1S"
         )
 PREHOOK: type: CREATETABLE
@@ -24,7 +24,7 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string,
         "druid.kafka.ingestion.useEarliestOffset" = "true",
         "druid.kafka.ingestion.maxRowsInMemory" = "5",
         "druid.kafka.ingestion.startDelay" = "PT1S",
-        "druid.kafka.ingestion.taskDuration" = "PT30S",
+        "druid.kafka.ingestion.taskDuration" = "PT20S",
         "druid.kafka.ingestion.period" = "PT1S"
         )
 POSTHOOK: type: CREATETABLE
@@ -65,6 +65,15 @@ added               	int                 	from deserializer
 deleted             	int                 	from deserializer   
 	 	 
 #### A masked pattern was here ####
+StorageHandlerInfo	 	 
+Druid Storage Handler Runtime Status for default.druid_kafka_test	 	 
+kafkaPartitions=1	 	 
+activeTasks=[]	 	 
+publishingTasks=[]	 	 
+latestOffsets={0=10}	 	 
+minimumLag={}	 	 
+aggregateLag=0	 	 
+#### A masked pattern was here ####
 PREHOOK: query: Select count(*) FROM druid_kafka_test
 PREHOOK: type: QUERY
 PREHOOK: Input: default@druid_kafka_test
@@ -126,6 +135,15 @@ added               	int                 	from deserializer
 deleted             	int                 	from deserializer   
 	 	 
 #### A masked pattern was here ####
+StorageHandlerInfo	 	 
+Druid Storage Handler Runtime Status for default.druid_kafka_test	 	 
+kafkaPartitions=1	 	 
+activeTasks=[]	 	 
+publishingTasks=[]	 	 
+latestOffsets={0=10}	 	 
+minimumLag={}	 	 
+aggregateLag=0	 	 
+#### A masked pattern was here ####
 PREHOOK: query: Select count(*) FROM druid_kafka_test
 PREHOOK: type: QUERY
 PREHOOK: Input: default@druid_kafka_test
@@ -331,7 +349,7 @@ STAGE PLANS:
                     druid.kafka.ingestion.maxRowsInMemory 5
                     druid.kafka.ingestion.period PT1S
                     druid.kafka.ingestion.startDelay PT1S
-                    druid.kafka.ingestion.taskDuration PT30S
+                    druid.kafka.ingestion.taskDuration PT20S
                     druid.kafka.ingestion.useEarliestOffset true
                     druid.query.granularity MINUTE
                     druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
@@ -370,7 +388,7 @@ STAGE PLANS:
                       druid.kafka.ingestion.maxRowsInMemory 5
                       druid.kafka.ingestion.period PT1S
                       druid.kafka.ingestion.startDelay PT1S
-                      druid.kafka.ingestion.taskDuration PT30S
+                      druid.kafka.ingestion.taskDuration PT20S
                       druid.kafka.ingestion.useEarliestOffset true
                       druid.query.granularity MINUTE
                       druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}