You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/08 23:06:04 UTC
[2/2] hive git commit: HIVE-19173 : Add Storage Handler runtime
information as part of DESCRIBE EXTENDED (Nishant Bangarwa via Ashutosh
Chauhan)
HIVE-19173 : Add Storage Handler runtime information as part of DESCRIBE EXTENDED (Nishant Bangarwa via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa040c5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa040c5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa040c5b
Branch: refs/heads/branch-3
Commit: aa040c5bfcea2257b4aa89f39832a7d6198a43a0
Parents: b135724
Author: Nishant Bangarwa <ni...@gmail.com>
Authored: Fri Apr 13 09:43:00 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Tue May 8 16:05:54 2018 -0700
----------------------------------------------------------------------
.../hadoop/hive/druid/DruidStorageHandler.java | 70 +++++-
.../hive/druid/DruidStorageHandlerInfo.java | 72 ++++++
.../hive/druid/json/KafkaSupervisorReport.java | 231 +++++++++++++++++++
.../hadoop/hive/druid/json/TaskReportData.java | 125 ++++++++++
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 7 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 62 +++--
.../hive/ql/metadata/HiveStorageHandler.java | 17 +-
.../hive/ql/metadata/StorageHandlerInfo.java | 38 +++
.../formatting/JsonMetaDataFormatter.java | 6 +-
.../metadata/formatting/MetaDataFormatter.java | 4 +-
.../formatting/TextMetaDataFormatter.java | 11 +-
.../clientpositive/druidkafkamini_basic.q | 6 +-
.../druid/druidkafkamini_basic.q.out | 26 ++-
13 files changed, 633 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index bc08bd8..3e707e3 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -35,6 +35,7 @@ import com.metamx.http.client.HttpClientInit;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
+
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
@@ -59,6 +60,7 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.storage.hdfs.HdfsDataSegmentPusher;
import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import io.druid.timeline.DataSegment;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -69,6 +71,7 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig;
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig;
import org.apache.hadoop.hive.druid.security.KerberosHttpClient;
@@ -82,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -94,6 +98,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
+
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
@@ -116,6 +121,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
/**
@@ -454,7 +461,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
console.printInfo("Druid Kafka Ingestion Reset successful.");
} else {
throw new IOException(String
- .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+ .format("Unable to reset Kafka Ingestion Druid status [%d] full response [%s]",
response.getStatus().getCode(), response.getContent()));
}
} catch (Exception e) {
@@ -486,7 +493,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
}
- public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
+ private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
// Stop Kafka Ingestion first
final String overlordAddress = Preconditions.checkNotNull(HiveConf
.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
@@ -512,7 +519,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
return null;
} else {
throw new IOException(String
- .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]",
+ .format("Unable to fetch Kafka Ingestion Spec from Druid status [%d] full response [%s]",
response.getStatus().getCode(), response.getContent()));
}
} catch (Exception e) {
@@ -521,6 +528,46 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
}
/**
+ * Fetches kafka supervisor status report from druid overlod.
+ * @param table
+ * @return kafka supervisor report or null when druid overlord is unreachable.
+ */
+ @Nullable
+ private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) {
+ final String overlordAddress = Preconditions.checkNotNull(HiveConf
+ .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS),
+ "Druid Overlord Address is null");
+ String dataSourceName = Preconditions
+ .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE),
+ "Druid Datasource name is null");
+ try {
+ StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET,
+ new URL(String
+ .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress,
+ dataSourceName))),
+ new StatusResponseHandler(
+ Charset.forName("UTF-8"))).get(),
+ input -> input instanceof IOException,
+ getMaxRetryCount());
+ if (response.getStatus().equals(HttpResponseStatus.OK)) {
+ return DruidStorageHandlerUtils.JSON_MAPPER
+ .readValue(response.getContent(), KafkaSupervisorReport.class);
+ // Druid Returns 400 Bad Request when not found.
+ } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
+ LOG.info("No Kafka Supervisor found for datasource[%s]", dataSourceName);
+ return null;
+ } else {
+ LOG.error("Unable to fetch Kafka Supervisor status [%d] full response [%s]",
+ response.getStatus().getCode(), response.getContent());
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while fetching kafka ingestion spec from druid", e);
+ return null;
+ }
+ }
+
+ /**
* Creates metadata moves then commit the Segment's metadata to Druid metadata store in one TxN
*
* @param table Hive table
@@ -995,6 +1042,7 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
updateKafkaIngestion(table);
}
}
+
private static <T> Boolean getBooleanProperty(Table table, String propertyName) {
String val = getTableProperty(table, propertyName);
if (val == null) {
@@ -1057,4 +1105,20 @@ public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStor
private int getMaxRetryCount() {
return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
}
+
+ @Override
+ public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
+ if(isKafkaStreamingTable(table)){
+ KafkaSupervisorReport kafkaSupervisorReport = fetchKafkaSupervisorReport(table);
+ if(kafkaSupervisorReport == null){
+ return DruidStorageHandlerInfo.UNREACHABLE;
+ }
+ return new DruidStorageHandlerInfo(kafkaSupervisorReport);
+ }
+ else
+ // TODO: Currently we do not expose any runtime info for non-streaming tables.
+ // In future extend this add more information regarding table status.
+ // e.g. Total size of segments in druid, loadstatus of table on historical nodes etc.
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java
new file mode 100644
index 0000000..f0e1750
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import io.druid.java.util.common.StringUtils;
+
+import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
+
+/**
+ * DruidStorageHandlerInfo provides a runtime information for DruidStorageHandler.
+ */
+@SuppressWarnings("serial")
+public class DruidStorageHandlerInfo implements StorageHandlerInfo {
+
+ public static final StorageHandlerInfo UNREACHABLE = new StorageHandlerInfo() {
+ @Override
+ public String formatAsText() {
+ return "Druid Overlord is Unreachable, Runtime Status : unknown";
+ }
+ };
+
+ private final KafkaSupervisorReport kafkaSupervisorReport;
+
+ DruidStorageHandlerInfo(KafkaSupervisorReport kafkaSupervisorReport) {
+ this.kafkaSupervisorReport = kafkaSupervisorReport;
+ }
+
+ @Override
+ public String formatAsText() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Druid Storage Handler Runtime Status for " + kafkaSupervisorReport.getId());
+ sb.append("\n");
+ sb.append("kafkaPartitions=" + kafkaSupervisorReport.getPayload().getPartitions());
+ sb.append("\n");
+ sb.append("activeTasks=" + kafkaSupervisorReport.getPayload().getActiveTasks());
+ sb.append("\n");
+ sb.append("publishingTasks=" + kafkaSupervisorReport.getPayload().getPublishingTasks());
+ if (kafkaSupervisorReport.getPayload().getLatestOffsets() != null) {
+ sb.append("\n");
+ sb.append("latestOffsets=" + kafkaSupervisorReport.getPayload().getLatestOffsets());
+ }
+ if (kafkaSupervisorReport.getPayload().getMinimumLag() != null) {
+ sb.append("\n");
+ sb.append("minimumLag=" + kafkaSupervisorReport.getPayload().getMinimumLag());
+ }
+ if (kafkaSupervisorReport.getPayload().getAggregateLag() != null) {
+ sb.append("\n");
+ sb.append("aggregateLag=" + kafkaSupervisorReport.getPayload().getAggregateLag());
+ }
+ if (kafkaSupervisorReport.getPayload().getOffsetsLastUpdated() != null) {
+ sb.append("\n");
+ sb.append("lastUpdateTime=" + kafkaSupervisorReport.getPayload().getOffsetsLastUpdated());
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java
new file mode 100644
index 0000000..5a6756e
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid.json;
+
+import io.druid.guice.annotations.Json;
+import io.druid.indexing.overlord.supervisor.SupervisorReport;
+import io.druid.java.util.common.IAE;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+
+import org.joda.time.DateTime;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class KafkaSupervisorReport extends SupervisorReport
+{
+ public static class KafkaSupervisorReportPayload
+ {
+ private final String dataSource;
+ private final String topic;
+ private final Integer partitions;
+ private final Integer replicas;
+ private final Long durationSeconds;
+ private final List<TaskReportData> activeTasks;
+ private final List<TaskReportData> publishingTasks;
+ private final Map<Integer, Long> latestOffsets;
+ private final Map<Integer, Long> minimumLag;
+ private final Long aggregateLag;
+ private final DateTime offsetsLastUpdated;
+
+ @JsonCreator
+ public KafkaSupervisorReportPayload(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("topic") String topic,
+ @JsonProperty("partitions") Integer partitions,
+ @JsonProperty("replicas") Integer replicas,
+ @JsonProperty("durationSeconds") Long durationSeconds,
+ @Nullable @JsonProperty("latestOffsets") Map<Integer, Long> latestOffsets,
+ @Nullable @JsonProperty("minimumLag") Map<Integer, Long> minimumLag,
+ @Nullable @JsonProperty("aggregateLag") Long aggregateLag,
+ @Nullable @JsonProperty("offsetsLastUpdated") DateTime offsetsLastUpdated
+ )
+ {
+ this.dataSource = dataSource;
+ this.topic = topic;
+ this.partitions = partitions;
+ this.replicas = replicas;
+ this.durationSeconds = durationSeconds;
+ this.activeTasks = Lists.newArrayList();
+ this.publishingTasks = Lists.newArrayList();
+ this.latestOffsets = latestOffsets;
+ this.minimumLag = minimumLag;
+ this.aggregateLag = aggregateLag;
+ this.offsetsLastUpdated = offsetsLastUpdated;
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ @JsonProperty
+ public Integer getPartitions()
+ {
+ return partitions;
+ }
+
+ @JsonProperty
+ public Integer getReplicas()
+ {
+ return replicas;
+ }
+
+ @JsonProperty
+ public Long getDurationSeconds()
+ {
+ return durationSeconds;
+ }
+
+ @JsonProperty
+ public List<TaskReportData> getActiveTasks()
+ {
+ return activeTasks;
+ }
+
+ @JsonProperty
+ public List<TaskReportData> getPublishingTasks()
+ {
+ return publishingTasks;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<Integer, Long> getLatestOffsets()
+ {
+ return latestOffsets;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<Integer, Long> getMinimumLag()
+ {
+ return minimumLag;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Long getAggregateLag()
+ {
+ return aggregateLag;
+ }
+
+ @JsonProperty
+ public DateTime getOffsetsLastUpdated()
+ {
+ return offsetsLastUpdated;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "dataSource='" + dataSource + '\'' +
+ ", topic='" + topic + '\'' +
+ ", partitions=" + partitions +
+ ", replicas=" + replicas +
+ ", durationSeconds=" + durationSeconds +
+ ", active=" + activeTasks +
+ ", publishing=" + publishingTasks +
+ (latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") +
+ (minimumLag != null ? ", minimumLag=" + minimumLag : "") +
+ (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") +
+ (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") +
+ '}';
+ }
+ }
+
+ private final KafkaSupervisorReportPayload payload;
+
+ @JsonCreator
+ public KafkaSupervisorReport(@JsonProperty("id") String id,
+ @JsonProperty("generationTime")DateTime generationTime,
+ @JsonProperty("payload") KafkaSupervisorReportPayload payload){
+ super(id, generationTime);
+ this.payload = payload;
+ }
+
+ public KafkaSupervisorReport(
+ String dataSource,
+ DateTime generationTime,
+ String topic,
+ Integer partitions,
+ Integer replicas,
+ Long durationSeconds,
+ @Nullable Map<Integer, Long> latestOffsets,
+ @Nullable Map<Integer, Long> minimumLag,
+ @Nullable Long aggregateLag,
+ @Nullable DateTime offsetsLastUpdated
+ ) {
+ this(dataSource, generationTime, new KafkaSupervisorReportPayload(
+ dataSource,
+ topic,
+ partitions,
+ replicas,
+ durationSeconds,
+ latestOffsets,
+ minimumLag,
+ aggregateLag,
+ offsetsLastUpdated
+ ));
+ }
+
+ @Override
+ public KafkaSupervisorReportPayload getPayload()
+ {
+ return payload;
+ }
+
+ public void addTask(TaskReportData data)
+ {
+ if (data.getType().equals(TaskReportData.TaskType.ACTIVE)) {
+ payload.activeTasks.add(data);
+ } else if (data.getType().equals(TaskReportData.TaskType.PUBLISHING)) {
+ payload.publishingTasks.add(data);
+ } else {
+ throw new IAE("Unknown task type [%s]", data.getType().name());
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "id='" + getId() + '\'' +
+ ", generationTime=" + getGenerationTime() +
+ ", payload=" + payload +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
----------------------------------------------------------------------
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
new file mode 100644
index 0000000..94a3f7f
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class TaskReportData
+{
+ public enum TaskType
+ {
+ ACTIVE, PUBLISHING, UNKNOWN
+ }
+
+ private final String id;
+ private final Map<Integer, Long> startingOffsets;
+ private final DateTime startTime;
+ private final Long remainingSeconds;
+ private final TaskType type;
+ private final Map<Integer, Long> currentOffsets;
+ private final Map<Integer, Long> lag;
+
+ public TaskReportData(
+ String id,
+ @Nullable Map<Integer, Long> startingOffsets,
+ @Nullable Map<Integer, Long> currentOffsets,
+ DateTime startTime,
+ Long remainingSeconds,
+ TaskType type,
+ @Nullable Map<Integer, Long> lag
+ )
+ {
+ this.id = id;
+ this.startingOffsets = startingOffsets;
+ this.currentOffsets = currentOffsets;
+ this.startTime = startTime;
+ this.remainingSeconds = remainingSeconds;
+ this.type = type;
+ this.lag = lag;
+ }
+
+ @JsonProperty
+ public String getId()
+ {
+ return id;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<Integer, Long> getStartingOffsets()
+ {
+ return startingOffsets;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<Integer, Long> getCurrentOffsets()
+ {
+ return currentOffsets;
+ }
+
+ @JsonProperty
+ public DateTime getStartTime()
+ {
+ return startTime;
+ }
+
+ @JsonProperty
+ public Long getRemainingSeconds()
+ {
+ return remainingSeconds;
+ }
+
+ @JsonProperty
+ public TaskType getType()
+ {
+ return type;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<Integer, Long> getLag()
+ {
+ return lag;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" +
+ "id='" + id + '\'' +
+ (startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") +
+ (currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") +
+ ", startTime=" + startTime +
+ ", remainingSeconds=" + remainingSeconds +
+ (lag != null ? ", lag=" + lag : "") +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index b3c95eb..c9c5054 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.exec;
import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
import java.io.BufferedWriter;
@@ -78,7 +77,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.PartitionDropOptions;
import org.apache.hadoop.hive.metastore.StatObjectConverter;
import org.apache.hadoop.hive.metastore.TableType;
@@ -166,6 +164,7 @@ import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils;
@@ -3757,6 +3756,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
NotNullConstraint nnInfo = null;
DefaultConstraint dInfo = null;
CheckConstraint cInfo = null;
+ StorageHandlerInfo storageHandlerInfo = null;
if (descTbl.isExt() || descTbl.isFormatted()) {
pkInfo = db.getPrimaryKeys(tbl.getDbName(), tbl.getTableName());
fkInfo = db.getForeignKeys(tbl.getDbName(), tbl.getTableName());
@@ -3764,6 +3764,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
nnInfo = db.getNotNullConstraints(tbl.getDbName(), tbl.getTableName());
dInfo = db.getDefaultConstraints(tbl.getDbName(), tbl.getTableName());
cInfo = db.getCheckConstraints(tbl.getDbName(), tbl.getTableName());
+ storageHandlerInfo = db.getStorageHandlerInfo(tbl);
}
fixDecimalColumnTypeName(cols);
// In case the query is served by HiveServer2, don't pad it with spaces,
@@ -3772,7 +3773,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
formatter.describeTable(outStream, colPath, tableName, tbl, part,
cols, descTbl.isFormatted(), descTbl.isExt(),
isOutputPadded, colStats,
- pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo);
+ pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo, storageHandlerInfo);
LOG.debug("DDLTask: written data for {}", tableName);
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 3218f96..64b3f83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.jdo.JDODataStoreException;
import com.google.common.collect.ImmutableList;
@@ -95,7 +96,6 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
import org.apache.hadoop.hive.common.log.InPlaceUpdate;
-import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HdfsUtils;
@@ -4166,29 +4166,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException {
HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() {
- @Override
- public HiveMetaHook getHook(
- org.apache.hadoop.hive.metastore.api.Table tbl)
- throws MetaException {
-
- try {
- if (tbl == null) {
- return null;
- }
- HiveStorageHandler storageHandler =
- HiveUtils.getStorageHandler(conf,
- tbl.getParameters().get(META_TABLE_STORAGE));
- if (storageHandler == null) {
- return null;
- }
- return storageHandler.getMetaHook();
- } catch (HiveException ex) {
- LOG.error(StringUtils.stringifyException(ex));
- throw new MetaException(
- "Failed to load storage handler: " + ex.getMessage());
- }
- }
- };
+ @Override
+ public HiveMetaHook getHook(
+ org.apache.hadoop.hive.metastore.api.Table tbl)
+ throws MetaException {
+ HiveStorageHandler storageHandler = createStorageHandler(tbl);
+ return storageHandler == null ? null : storageHandler.getMetaHook();
+ }
+ };
if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) {
return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded);
@@ -4198,6 +4183,22 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
+ @Nullable
+ private HiveStorageHandler createStorageHandler(org.apache.hadoop.hive.metastore.api.Table tbl) throws MetaException {
+ try {
+ if (tbl == null) {
+ return null;
+ }
+ HiveStorageHandler storageHandler =
+ HiveUtils.getStorageHandler(conf, tbl.getParameters().get(META_TABLE_STORAGE));
+ return storageHandler;
+ } catch (HiveException ex) {
+ LOG.error(StringUtils.stringifyException(ex));
+ throw new MetaException(
+ "Failed to load storage handler: " + ex.getMessage());
+ }
+ }
+
public static class SchemaException extends MetaException {
private static final long serialVersionUID = 1L;
public SchemaException(String message) {
@@ -5115,4 +5116,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
throw new HiveException(e);
}
}
-};
+
+ @Nullable
+ public StorageHandlerInfo getStorageHandlerInfo(Table table)
+ throws HiveException {
+ try {
+ HiveStorageHandler storageHandler = createStorageHandler(table.getTTable());
+ return storageHandler == null ? null : storageHandler.getStorageHandlerInfo(table.getTTable());
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 99bb9f6..1696243 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hive.ql.metadata;
+import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -149,7 +152,19 @@ public interface HiveStorageHandler extends Configurable {
* Called just before submitting MapReduce job.
*
* @param tableDesc descriptor for the table being accessed
- * @param JobConf jobConf for MapReduce job
+ * @param jobConf jobConf for MapReduce job
*/
public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
+
+ /**
+ * Used to fetch runtime information about storage handler during DESCRIBE EXTENDED statement
+ *
+ * @param table table definition
+ * @return StorageHandlerInfo containing runtime information about storage handler
+ * OR `null` if the storage handler choose to not provide any runtime information.
+ */
+ public default StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException
+ {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
new file mode 100644
index 0000000..dbc44a6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * StorageHandlerInfo is a marker interface used to provide runtime information associated with a storage handler.
+ */
+public interface StorageHandlerInfo extends Serializable {
+ /**
+ * Called from Describe Extended Statement when Formatter is Text Formatter.
+ * @return Formatted StorageHandlerInfo as String
+ */
+ String formatAsText();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
index cd70eee..c21967c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
import org.codehaus.jackson.JsonGenerator;
@@ -117,7 +118,7 @@ public class JsonMetaDataFormatter implements MetaDataFormatter {
boolean isOutputPadded, List<ColumnStatisticsObj> colStats,
PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo,
UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo,
- CheckConstraint cInfo) throws HiveException {
+ CheckConstraint cInfo, StorageHandlerInfo storageHandlerInfo) throws HiveException {
MapBuilder builder = MapBuilder.create();
builder.put("columns", makeColsUnformatted(cols));
@@ -146,6 +147,9 @@ public class JsonMetaDataFormatter implements MetaDataFormatter {
if (cInfo != null && !cInfo.getCheckConstraints().isEmpty()) {
builder.put("checkConstraintInfo", cInfo);
}
+ if(storageHandlerInfo != null) {
+ builder.put("storageHandlerInfo", storageHandlerInfo.toString());
+ }
}
asJson(out, builder.build());
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
index ed2cdd1..d15016c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
@@ -91,7 +92,8 @@ public interface MetaDataFormatter {
boolean isFormatted, boolean isExt,
boolean isOutputPadded, List<ColumnStatisticsObj> colStats,
PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo,
- UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo)
+ UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo,
+ StorageHandlerInfo storageHandlerInfo)
throws HiveException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
index 63a2969..2529923 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hive.common.util.HiveStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,7 +133,8 @@ class TextMetaDataFormatter implements MetaDataFormatter {
boolean isFormatted, boolean isExt,
boolean isOutputPadded, List<ColumnStatisticsObj> colStats,
PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo,
- UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo)
+ UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo,
+ StorageHandlerInfo storageHandlerInfo)
throws HiveException {
try {
List<FieldSchema> partCols = tbl.isPartitioned() ? tbl.getPartCols() : null;
@@ -252,6 +254,13 @@ class TextMetaDataFormatter implements MetaDataFormatter {
outStream.write(terminator);
}
}
+
+ if (storageHandlerInfo!= null) {
+ outStream.write(("StorageHandlerInfo").getBytes("UTF-8"));
+ outStream.write(terminator);
+ outStream.write(storageHandlerInfo.formatAsText().getBytes("UTF-8"));
+ outStream.write(terminator);
+ }
}
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
index f4fd2a6..4c30cdd 100644
--- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
+++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q
@@ -9,7 +9,7 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
- "druid.kafka.ingestion.taskDuration" = "PT30S",
+ "druid.kafka.ingestion.taskDuration" = "PT20S",
"druid.kafka.ingestion.period" = "PT1S"
);
@@ -18,7 +18,7 @@ ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START'
!curl -ss http://localhost:8081/druid/indexer/v1/supervisor;
-- Sleep for some time for ingestion tasks to ingest events
-!sleep 50;
+!sleep 60;
DESCRIBE druid_kafka_test;
DESCRIBE EXTENDED druid_kafka_test;
@@ -32,7 +32,7 @@ Select page FROM druid_kafka_test order by page;
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');
-- Sleep for some time for ingestion tasks to ingest events
-!sleep 50;
+!sleep 60;
DESCRIBE druid_kafka_test;
DESCRIBE EXTENDED druid_kafka_test;
http://git-wip-us.apache.org/repos/asf/hive/blob/aa040c5b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
index 6f553fa..c2cc249 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out
@@ -8,7 +8,7 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
- "druid.kafka.ingestion.taskDuration" = "PT30S",
+ "druid.kafka.ingestion.taskDuration" = "PT20S",
"druid.kafka.ingestion.period" = "PT1S"
)
PREHOOK: type: CREATETABLE
@@ -24,7 +24,7 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string,
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
- "druid.kafka.ingestion.taskDuration" = "PT30S",
+ "druid.kafka.ingestion.taskDuration" = "PT20S",
"druid.kafka.ingestion.period" = "PT1S"
)
POSTHOOK: type: CREATETABLE
@@ -65,6 +65,15 @@ added int from deserializer
deleted int from deserializer
#### A masked pattern was here ####
+StorageHandlerInfo
+Druid Storage Handler Runtime Status for default.druid_kafka_test
+kafkaPartitions=1
+activeTasks=[]
+publishingTasks=[]
+latestOffsets={0=10}
+minimumLag={}
+aggregateLag=0
+#### A masked pattern was here ####
PREHOOK: query: Select count(*) FROM druid_kafka_test
PREHOOK: type: QUERY
PREHOOK: Input: default@druid_kafka_test
@@ -126,6 +135,15 @@ added int from deserializer
deleted int from deserializer
#### A masked pattern was here ####
+StorageHandlerInfo
+Druid Storage Handler Runtime Status for default.druid_kafka_test
+kafkaPartitions=1
+activeTasks=[]
+publishingTasks=[]
+latestOffsets={0=10}
+minimumLag={}
+aggregateLag=0
+#### A masked pattern was here ####
PREHOOK: query: Select count(*) FROM druid_kafka_test
PREHOOK: type: QUERY
PREHOOK: Input: default@druid_kafka_test
@@ -331,7 +349,7 @@ STAGE PLANS:
druid.kafka.ingestion.maxRowsInMemory 5
druid.kafka.ingestion.period PT1S
druid.kafka.ingestion.startDelay PT1S
- druid.kafka.ingestion.taskDuration PT30S
+ druid.kafka.ingestion.taskDuration PT20S
druid.kafka.ingestion.useEarliestOffset true
druid.query.granularity MINUTE
druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}
@@ -370,7 +388,7 @@ STAGE PLANS:
druid.kafka.ingestion.maxRowsInMemory 5
druid.kafka.ingestion.period PT1S
druid.kafka.ingestion.startDelay PT1S
- druid.kafka.ingestion.taskDuration PT30S
+ druid.kafka.ingestion.taskDuration PT20S
druid.kafka.ingestion.useEarliestOffset true
druid.query.granularity MINUTE
druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"}