You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/05/10 00:36:30 UTC
[hive] branch master updated: HIVE-23184 : Upgrade druid to 0.17.1
( Nishant Bangarwa via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9a6ec1e HIVE-23184 : Upgrade druid to 0.17.1 ( Nishant Bangarwa via Ashutosh Chauhan)
9a6ec1e is described below
commit 9a6ec1e351d59b76419a21d1f2c8781e306b02d0
Author: Nishant Bangarwa <ni...@gmail.com>
AuthorDate: Mon Apr 13 22:29:26 2020 +0530
HIVE-23184 : Upgrade druid to 0.17.1 ( Nishant Bangarwa via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
data/scripts/kafka_init_data.csv | 2 +-
druid-handler/pom.xml | 6 +
.../apache/hadoop/hive/druid/DruidKafkaUtils.java | 43 ++-
.../hadoop/hive/druid/DruidStorageHandler.java | 61 ++--
.../hive/druid/DruidStorageHandlerUtils.java | 62 ++---
.../hadoop/hive/druid/io/DruidOutputFormat.java | 22 +-
.../hive/druid/io/DruidQueryBasedInputFormat.java | 56 +---
.../hadoop/hive/druid/io/DruidRecordWriter.java | 10 +-
.../druid/json/KafkaIndexTaskTuningConfig.java | 128 +++++++++
.../hive/druid/json/KafkaSupervisorSpec.java | 20 +-
.../druid/json/KafkaSupervisorTuningConfig.java | 208 +++++---------
.../hadoop/hive/druid/json/KafkaTuningConfig.java | 307 --------------------
.../json/SeekableStreamIndexTaskTuningConfig.java | 308 +++++++++++++++++++++
.../json/SeekableStreamSupervisorTuningConfig.java | 59 ++++
.../hive/druid/serde/DruidQueryRecordReader.java | 3 +-
.../druid/serde/DruidSelectQueryRecordReader.java | 92 ------
.../hadoop/hive/druid/TestDruidStorageHandler.java | 8 +-
.../druid/TestHiveDruidQueryBasedInputFormat.java | 44 ---
.../hadoop/hive/druid/serde/TestDruidSerDe.java | 244 ++++------------
.../hadoop/hive/ql/io/TestDruidRecordWriter.java | 21 +-
itests/qtest-druid/pom.xml | 22 ++
pom.xml | 2 +-
.../druid/druidkafkamini_delimited.q.out | 2 +-
.../druidmini_semijoin_reduction_all_types.q.out | 24 +-
24 files changed, 753 insertions(+), 1001 deletions(-)
diff --git a/data/scripts/kafka_init_data.csv b/data/scripts/kafka_init_data.csv
index 5dc094e..d818144 100644
--- a/data/scripts/kafka_init_data.csv
+++ b/data/scripts/kafka_init_data.csv
@@ -1,4 +1,4 @@
-"2013-08-31T01:02:33Z", "Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143
+"2013-08-31T01:02:33Z","Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143
"2013-08-31T03:32:45Z","Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330
"2013-08-31T07:11:21Z","Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111
"2013-08-31T11:58:39Z","Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900
diff --git a/druid-handler/pom.xml b/druid-handler/pom.xml
index c7a2d4c..e6ca298 100644
--- a/druid-handler/pom.xml
+++ b/druid-handler/pom.xml
@@ -293,6 +293,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
index b56d48a..fb6ce30 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidKafkaUtils.java
@@ -29,8 +29,8 @@ import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.FullResponseHandler;
-import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -66,11 +66,13 @@ final class DruidKafkaUtils {
private DruidKafkaUtils() {
}
- static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table,
+ static KafkaSupervisorSpec createKafkaSupervisorSpec(
+ Table table,
String kafkaTopic,
String kafkaServers,
DataSchema dataSchema,
- IndexSpec indexSpec) {
+ IndexSpec indexSpec
+ ) {
return new KafkaSupervisorSpec(dataSchema,
new KafkaSupervisorTuningConfig(DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"),
@@ -78,17 +80,14 @@ final class DruidKafkaUtils {
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxBytesInMemory"),
DruidStorageHandlerUtils.getIntegerProperty(table,
- DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"),
- DruidStorageHandlerUtils.getLongProperty(table,
- DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"),
+ DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), DruidStorageHandlerUtils
+ .getLongProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxTotalRows"),
DruidStorageHandlerUtils.getPeriodProperty(table,
- DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"),
- null,
+ DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), null,
// basePersistDirectory - use druid default, no need to be configured by user
- DruidStorageHandlerUtils.getIntegerProperty(table,
- DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"),
- indexSpec,
- null,
+ DruidStorageHandlerUtils
+ .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"),
+ indexSpec, null, null,
// buildV9Directly - use druid default, no need to be configured by user
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"),
@@ -96,9 +95,8 @@ final class DruidKafkaUtils {
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"),
DruidStorageHandlerUtils.getBooleanProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"),
- TmpFileSegmentWriteOutMediumFactory.instance(),
- DruidStorageHandlerUtils.getIntegerProperty(table,
- DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"),
+ TmpFileSegmentWriteOutMediumFactory.instance(), DruidStorageHandlerUtils
+ .getIntegerProperty(table, DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"),
DruidStorageHandlerUtils.getIntegerProperty(table,
DruidConstants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"),
DruidStorageHandlerUtils.getLongProperty(table,
@@ -161,14 +159,11 @@ final class DruidKafkaUtils {
String task = JSON_MAPPER.writeValueAsString(spec);
CONSOLE.printInfo("submitting kafka Spec {}", task);
LOG.info("submitting kafka Supervisor Spec {}", task);
- FullResponseHolder
- response =
- DruidStorageHandlerUtils.getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(),
- new Request(HttpMethod.POST,
- new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))).setContent(
- "application/json",
- JSON_MAPPER.writeValueAsBytes(spec)),
- new FullResponseHandler(Charset.forName("UTF-8")));
+ StringFullResponseHolder response = DruidStorageHandlerUtils
+ .getResponseFromCurrentLeader(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.POST,
+ new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress)))
+ .setContent("application/json", JSON_MAPPER.writeValueAsBytes(spec)),
+ new StringFullResponseHandler(Charset.forName("UTF-8")));
if (response.getStatus().equals(HttpResponseStatus.OK)) {
String
msg =
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index fe55eff..beaf249 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -39,8 +39,8 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.HttpClientConfig;
import org.apache.druid.java.util.http.client.HttpClientInit;
import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.FullResponseHandler;
-import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
@@ -50,6 +50,7 @@ import org.apache.druid.metadata.storage.mysql.MySQLConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnector;
import org.apache.druid.metadata.storage.postgresql.PostgreSQLConnectorConfig;
+import org.apache.druid.metadata.storage.postgresql.PostgreSQLTablesConfig;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.Query;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -365,15 +366,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
private void resetKafkaIngestion(String overlordAddress, String dataSourceName) {
try {
- FullResponseHolder
+ StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
- new Request(HttpMethod.POST,
- new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/reset",
- overlordAddress,
- dataSourceName))),
- new FullResponseHandler(Charset.forName("UTF-8"))),
- input -> input instanceof IOException,
+ new Request(HttpMethod.POST, new URL(
+ String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, dataSourceName))),
+ new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
CONSOLE.printInfo("Druid Kafka Ingestion Reset successful.");
@@ -389,15 +387,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
private void stopKafkaIngestion(String overlordAddress, String dataSourceName) {
try {
- FullResponseHolder
+ StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
- new Request(HttpMethod.POST,
- new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown",
- overlordAddress,
- dataSourceName))),
- new FullResponseHandler(Charset.forName("UTF-8"))),
- input -> input instanceof IOException,
+ new Request(HttpMethod.POST, new URL(
+ String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, dataSourceName))),
+ new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
CONSOLE.printInfo("Druid Kafka Ingestion shutdown successful.");
@@ -423,13 +418,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
try {
- FullResponseHolder
+ StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.GET,
new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, dataSourceName))),
- new FullResponseHandler(Charset.forName("UTF-8"))),
- input -> input instanceof IOException,
+ new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorSpec.class);
@@ -465,15 +459,12 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
Preconditions.checkNotNull(DruidStorageHandlerUtils.getTableProperty(table, Constants.DRUID_DATA_SOURCE),
"Druid Datasource name is null");
try {
- FullResponseHolder
+ StringFullResponseHolder
response =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
- new Request(HttpMethod.GET,
- new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/status",
- overlordAddress,
- dataSourceName))),
- new FullResponseHandler(Charset.forName("UTF-8"))),
- input -> input instanceof IOException,
+ new Request(HttpMethod.GET, new URL(
+ String.format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, dataSourceName))),
+ new StringFullResponseHandler(Charset.forName("UTF-8"))), input -> input instanceof IOException,
getMaxRetryCount());
if (response.getStatus().equals(HttpResponseStatus.OK)) {
return DruidStorageHandlerUtils.JSON_MAPPER.readValue(response.getContent(), KafkaSupervisorReport.class);
@@ -546,9 +537,8 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
coordinatorResponse =
RetryUtils.retry(() -> DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
new Request(HttpMethod.GET, new URL(String.format("http://%s/status", coordinatorAddress))),
- new FullResponseHandler(Charset.forName("UTF-8"))).getContent(),
- input -> input instanceof IOException,
- maxTries);
+ new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent(),
+ input -> input instanceof IOException, maxTries);
} catch (Exception e) {
CONSOLE.printInfo("Will skip waiting for data loading, coordinator unavailable");
return;
@@ -578,11 +568,9 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
while (numRetries++ < maxTries && !urlsOfUnloadedSegments.isEmpty()) {
urlsOfUnloadedSegments = ImmutableSet.copyOf(Sets.filter(urlsOfUnloadedSegments, input -> {
try {
- String
- result =
- DruidStorageHandlerUtils.getResponseFromCurrentLeader(getHttpClient(),
- new Request(HttpMethod.GET, input),
- new FullResponseHandler(Charset.forName("UTF-8"))).getContent();
+ String result = DruidStorageHandlerUtils
+ .getResponseFromCurrentLeader(getHttpClient(), new Request(HttpMethod.GET, input),
+ new StringFullResponseHandler(Charset.forName("UTF-8"))).getContent();
LOG.debug("Checking segment [{}] response is [{}]", input, result);
return Strings.isNullOrEmpty(result);
@@ -878,9 +866,8 @@ import static org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
case "postgresql":
connector =
new PostgreSQLConnector(storageConnectorConfigSupplier,
- Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()),
- new PostgreSQLConnectorConfig()
- );
+ Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), new PostgreSQLConnectorConfig(),
+ new PostgreSQLTablesConfig());
break;
case "derby":
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 1d7009b..5723150 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.calcite.adapter.druid.DruidQuery;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.guice.BloomFilterSerializersModule;
@@ -46,9 +47,9 @@ import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.FullResponseHandler;
-import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
@@ -79,8 +80,6 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.select.SelectQuery;
-import org.apache.druid.query.select.SelectQueryConfig;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
@@ -225,25 +224,18 @@ public final class DruidStorageHandlerUtils {
private static final int DEFAULT_MAX_TRIES = 10;
static {
+ // This is needed to initliaze NullHandling for druid without guice.
+ NullHandling.initializeForTests();
// This is needed for serde of PagingSpec as it uses JacksonInject for injecting SelectQueryConfig
- InjectableValues.Std
- injectableValues =
- new InjectableValues.Std().addValue(SelectQueryConfig.class, new SelectQueryConfig(false))
- // Expressions macro table used when we deserialize the query from calcite plan
- .addValue(ExprMacroTable.class,
- new ExprMacroTable(ImmutableList.of(new LikeExprMacro(),
- new RegexpExtractExprMacro(),
- new TimestampCeilExprMacro(),
- new TimestampExtractExprMacro(),
- new TimestampFormatExprMacro(),
- new TimestampParseExprMacro(),
- new TimestampShiftExprMacro(),
- new TimestampFloorExprMacro(),
- new TrimExprMacro.BothTrimExprMacro(),
- new TrimExprMacro.LeftTrimExprMacro(),
- new TrimExprMacro.RightTrimExprMacro())))
- .addValue(ObjectMapper.class, JSON_MAPPER)
- .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
+ InjectableValues.Std injectableValues = new InjectableValues.Std()
+ // Expressions macro table used when we deserialize the query from calcite plan
+ .addValue(ExprMacroTable.class, new ExprMacroTable(ImmutableList
+ .of(new LikeExprMacro(), new RegexpExtractExprMacro(), new TimestampCeilExprMacro(),
+ new TimestampExtractExprMacro(), new TimestampFormatExprMacro(), new TimestampParseExprMacro(),
+ new TimestampShiftExprMacro(), new TimestampFloorExprMacro(), new TrimExprMacro.BothTrimExprMacro(),
+ new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro())))
+ .addValue(ObjectMapper.class, JSON_MAPPER)
+ .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT);
JSON_MAPPER.setInjectableValues(injectableValues);
SMILE_MAPPER.setInjectableValues(injectableValues);
@@ -331,10 +323,9 @@ public final class DruidStorageHandlerUtils {
}
- static FullResponseHolder getResponseFromCurrentLeader(HttpClient client,
- Request request,
- FullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException {
- FullResponseHolder responseHolder = client.go(request, fullResponseHandler).get();
+ static StringFullResponseHolder getResponseFromCurrentLeader(HttpClient client, Request request,
+ StringFullResponseHandler fullResponseHandler) throws ExecutionException, InterruptedException {
+ StringFullResponseHolder responseHolder = client.go(request, fullResponseHandler).get();
if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(responseHolder.getStatus())) {
String redirectUrlStr = responseHolder.getResponse().headers().get("Location");
LOG.debug("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr);
@@ -342,9 +333,9 @@ public final class DruidStorageHandlerUtils {
try {
redirectUrl = new URL(redirectUrlStr);
} catch (MalformedURLException ex) {
- throw new ExecutionException(String.format(
- "Malformed redirect location is found in response from url[%s], new location[%s].",
- request.getUrl(),
+ throw new ExecutionException(String
+ .format("Malformed redirect location is found in response from url[%s], new location[%s].",
+ request.getUrl(),
redirectUrlStr), ex);
}
responseHolder = client.go(withUrl(request, redirectUrl), fullResponseHandler).get();
@@ -638,12 +629,11 @@ public final class DruidStorageHandlerUtils {
}
public static String createScanAllQuery(String dataSourceName, List<String> columns) throws JsonProcessingException {
- final ScanQuery.ScanQueryBuilder scanQueryBuilder = ScanQuery.newScanQueryBuilder();
+ final Druids.ScanQueryBuilder scanQueryBuilder = Druids.newScanQueryBuilder();
final List<Interval> intervals = Collections.singletonList(DEFAULT_INTERVAL);
ScanQuery
scanQuery =
- scanQueryBuilder.dataSource(dataSourceName)
- .resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
+ scanQueryBuilder.dataSource(dataSourceName).resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.intervals(new MultipleIntervalSegmentSpec(intervals))
.columns(columns)
.build();
@@ -977,11 +967,7 @@ public final class DruidStorageHandlerUtils {
.setVirtualColumns(VirtualColumns.create(virtualColumns)).build();
break;
case org.apache.druid.query.Query.SCAN:
- rv = ScanQuery.ScanQueryBuilder.copy((ScanQuery) query).filters(filter)
- .virtualColumns(VirtualColumns.create(virtualColumns)).build();
- break;
- case org.apache.druid.query.Query.SELECT:
- rv = Druids.SelectQueryBuilder.copy((SelectQuery) query).filters(filter)
+ rv = Druids.ScanQueryBuilder.copy((ScanQuery) query).filters(filter)
.virtualColumns(VirtualColumns.create(virtualColumns)).build();
break;
default:
@@ -1140,8 +1126,6 @@ public final class DruidStorageHandlerUtils {
return ((GroupByQuery) query).getVirtualColumns();
case org.apache.druid.query.Query.SCAN:
return ((ScanQuery) query).getVirtualColumns();
- case org.apache.druid.query.Query.SELECT:
- return ((SelectQuery) query).getVirtualColumns();
default:
throw new UnsupportedOperationException("Unsupported Query type " + query);
}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
index 6cf3ef2..d90db9c 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java
@@ -152,25 +152,9 @@ public class DruidOutputFormat implements HiveOutputFormat<NullWritable, DruidWr
Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY);
IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(jc);
- RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory,
- null,
- null,
- null,
- new File(basePersistDirectory, dataSource),
- new CustomVersioningPolicy(version),
- null,
- null,
- null,
- indexSpec,
- true,
- 0,
- 0,
- true,
- null,
- 0L,
- null,
- null
- );
+ RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, null, null, null,
+ new File(basePersistDirectory, dataSource), new CustomVersioningPolicy(version), null, null, null, indexSpec,
+ null, true, 0, 0, true, null, 0L, null, null);
LOG.debug(String.format("running with Data schema [%s] ", dataSchema));
return new DruidRecordWriter(dataSchema, realtimeTuningConfig,
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 82a1f11..37e7206 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -19,28 +19,24 @@ package org.apache.hadoop.hive.druid.io;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.LocatedSegmentDescriptor;
import org.apache.druid.query.Query;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.select.PagingSpec;
-import org.apache.druid.query.select.SelectQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.DruidStorageHandler;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.conf.DruidConstants;
import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader;
import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader;
import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader;
-import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader;
import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader;
import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader;
import org.apache.hadoop.hive.druid.serde.DruidWritable;
@@ -88,8 +84,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
return new DruidTopNQueryRecordReader();
case Query.GROUP_BY:
return new DruidGroupByQueryRecordReader();
- case Query.SELECT:
- return new DruidSelectQueryRecordReader();
case Query.SCAN:
return new DruidScanQueryRecordReader();
default:
@@ -152,9 +146,6 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
case Query.TOPN:
case Query.GROUP_BY:
return new HiveDruidSplit[] {new HiveDruidSplit(druidQuery, paths[0], new String[] {address})};
- case Query.SELECT:
- SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, SelectQuery.class);
- return distributeSelectQuery(address, selectQuery, paths[0]);
case Query.SCAN:
ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue(druidQuery, ScanQuery.class);
return distributeScanQuery(address, scanQuery, paths[0]);
@@ -163,54 +154,13 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW
}
}
- /* New method that distributes the Select query by creating splits containing
- * information about different Druid nodes that have the data for the given
- * query. */
- private static HiveDruidSplit[] distributeSelectQuery(String address, SelectQuery query, Path dummyPath)
- throws IOException {
- // If it has a limit, we use it and we do not distribute the query
- final boolean isFetch = query.getContextBoolean(DruidConstants.DRUID_QUERY_FETCH, false);
- if (isFetch) {
- return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
- dummyPath,
- new String[] {address})};
- }
-
- final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors(address, query);
-
- // Create one input split for each segment
- final int numSplits = segmentDescriptors.size();
- final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()];
- for (int i = 0; i < numSplits; i++) {
- final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i);
- final String[] hosts = new String[locatedSD.getLocations().size()];
- for (int j = 0; j < locatedSD.getLocations().size(); j++) {
- hosts[j] = locatedSD.getLocations().get(j).getHost();
- }
- // Create partial Select query
- final SegmentDescriptor
- newSD =
- new SegmentDescriptor(locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber());
- //@TODO This is fetching all the rows at once from broker or multiple historical nodes
- // Move to use scan query to avoid GC back pressure on the nodes
- // https://issues.apache.org/jira/browse/HIVE-17627
- final SelectQuery
- partialQuery =
- query.withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD)))
- .withPagingSpec(PagingSpec.newSpec(Integer.MAX_VALUE));
- splits[i] =
- new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), dummyPath, hosts);
- }
- return splits;
- }
-
/* New method that distributes the Scan query by creating splits containing
* information about different Druid nodes that have the data for the given
* query. */
private static HiveDruidSplit[] distributeScanQuery(String address, ScanQuery query, Path dummyPath)
throws IOException {
// If it has a limit, we use it and we do not distribute the query
- final boolean isFetch = query.getLimit() < Long.MAX_VALUE;
+ final boolean isFetch = query.getScanRowsLimit() < Long.MAX_VALUE;
if (isFetch) {
return new HiveDruidSplit[] {new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query),
dummyPath,
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 248b59a..dc16c4e 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -103,13 +103,9 @@ public class DruidRecordWriter implements RecordWriter<NullWritable, DruidWritab
"realtimeTuningConfig is null");
this.dataSchema = Preconditions.checkNotNull(dataSchema, "data schema is null");
- appenderator =
- Appenderators.createOffline(this.dataSchema,
- tuningConfig,
- new FireDepartmentMetrics(),
- dataSegmentPusher,
- DruidStorageHandlerUtils.JSON_MAPPER,
- DruidStorageHandlerUtils.INDEX_IO,
+ appenderator = Appenderators
+ .createOffline("hive-offline-appenderator", this.dataSchema, tuningConfig, false, new FireDepartmentMetrics(),
+ dataSegmentPusher, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.INDEX_IO,
DruidStorageHandlerUtils.INDEX_MERGER_V9);
this.maxPartitionSize = maxPartitionSize;
appenderator.startJob();
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java
new file mode 100644
index 0000000..92800ce
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig {
+ @JsonCreator
+ public KafkaIndexTaskTuningConfig(
+ @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+ @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+ @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+ @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+ @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+ @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+ @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+ @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+ // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+ @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+ @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+ @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+ @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+ @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+ @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+ @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+ @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+ ) {
+ super(
+ maxRowsInMemory,
+ maxBytesInMemory,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ true,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ false,
+ segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
+ }
+
+ @Override
+ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) {
+ return new KafkaIndexTaskTuningConfig(
+ getMaxRowsInMemory(),
+ getMaxBytesInMemory(),
+ getMaxRowsPerSegment(),
+ getMaxTotalRows(),
+ getIntermediatePersistPeriod(),
+ dir,
+ getMaxPendingPersists(),
+ getIndexSpec(),
+ getIndexSpecForIntermediatePersists(),
+ true,
+ isReportParseExceptions(),
+ getHandoffConditionTimeout(),
+ isResetOffsetAutomatically(),
+ getSegmentWriteOutMediumFactory(),
+ getIntermediateHandoffPeriod(),
+ isLogParseExceptions(),
+ getMaxParseExceptions(),
+ getMaxSavedParseExceptions()
+ );
+ }
+
+
+ @Override
+ public String toString() {
+ return "KafkaIndexTaskTuningConfig{" +
+ "maxRowsInMemory=" + getMaxRowsInMemory() +
+ ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
+ ", maxTotalRows=" + getMaxTotalRows() +
+ ", maxBytesInMemory=" + getMaxBytesInMemory() +
+ ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
+ ", basePersistDirectory=" + getBasePersistDirectory() +
+ ", maxPendingPersists=" + getMaxPendingPersists() +
+ ", indexSpec=" + getIndexSpec() +
+ ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
+ ", reportParseExceptions=" + isReportParseExceptions() +
+ ", handoffConditionTimeout=" + getHandoffConditionTimeout() +
+ ", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
+ ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
+ ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
+ ", logParseExceptions=" + isLogParseExceptions() +
+ ", maxParseExceptions=" + getMaxParseExceptions() +
+ ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
+ '}';
+ }
+
+}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
index d230832..18177b6 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java
@@ -52,24 +52,8 @@ import java.util.Map;
null,
null,
null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null);
+ null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
+ null, null);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.context = context;
}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
index b4d38b9..4e17161 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java
@@ -34,9 +34,14 @@ import java.io.File;
* This class is copied from druid source code
* in order to avoid adding additional dependencies on druid-indexing-service.
*/
-@SuppressWarnings("ALL") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({
- @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) })
-public class KafkaSupervisorTuningConfig extends KafkaTuningConfig {
+@SuppressWarnings("unused")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class)})
+public class KafkaSupervisorTuningConfig
+ extends KafkaIndexTaskTuningConfig implements SeekableStreamSupervisorTuningConfig {
+ private static final String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S";
+
private final Integer workerThreads;
private final Integer chatThreads;
private final Long chatRetries;
@@ -44,174 +49,113 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig {
private final Duration shutdownTimeout;
private final Duration offsetFetchPeriod;
- public KafkaSupervisorTuningConfig(@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+ public static KafkaSupervisorTuningConfig defaultConfig() {
+ return new KafkaSupervisorTuningConfig(null, null, null, null, null, null, null, null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null, null, null, null);
+ }
+
+ public KafkaSupervisorTuningConfig(
+ @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
- @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
- @JsonProperty("maxTotalRows") Long maxTotalRows,
+ @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
- @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
- @JsonProperty("indexSpec") IndexSpec indexSpec,
+ @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec,
+ @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
- @JsonProperty("workerThreads") Integer workerThreads,
- @JsonProperty("chatThreads") Integer chatThreads,
- @JsonProperty("chatRetries") Long chatRetries,
- @JsonProperty("httpTimeout") Period httpTimeout,
+ @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads,
+ @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) {
- super(maxRowsInMemory,
- maxBytesInMemory,
- maxRowsPerSegment,
- maxTotalRows,
- intermediatePersistPeriod,
- basePersistDirectory,
- maxPendingPersists,
- indexSpec,
- true,
- reportParseExceptions,
- handoffConditionTimeout,
- resetOffsetAutomatically,
- segmentWriteOutMediumFactory,
- intermediateHandoffPeriod,
- logParseExceptions,
- maxParseExceptions,
- maxSavedParseExceptions);
-
+ super(maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, maxTotalRows, intermediatePersistPeriod,
+ basePersistDirectory, maxPendingPersists, indexSpec, indexSpecForIntermediatePersists, true,
+ reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, maxSavedParseExceptions);
this.workerThreads = workerThreads;
this.chatThreads = chatThreads;
- this.chatRetries = (chatRetries != null ? chatRetries : 8);
- this.httpTimeout = defaultDuration(httpTimeout, "PT10S");
- this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S");
- this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S");
+ this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
+ this.httpTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(httpTimeout, DEFAULT_HTTP_TIMEOUT);
+ this.shutdownTimeout =
+ SeekableStreamSupervisorTuningConfig.defaultDuration(shutdownTimeout, DEFAULT_SHUTDOWN_TIMEOUT);
+ this.offsetFetchPeriod =
+ SeekableStreamSupervisorTuningConfig.defaultDuration(offsetFetchPeriod, DEFAULT_OFFSET_FETCH_PERIOD);
}
- @JsonProperty public Integer getWorkerThreads() {
+ @Override
+ @JsonProperty
+ public Integer getWorkerThreads() {
return workerThreads;
}
- @JsonProperty public Integer getChatThreads() {
+ @Override
+ @JsonProperty
+ public Integer getChatThreads() {
return chatThreads;
}
- @JsonProperty public Long getChatRetries() {
+ @Override
+ @JsonProperty
+ public Long getChatRetries() {
return chatRetries;
}
- @JsonProperty public Duration getHttpTimeout() {
+ @Override
+ @JsonProperty
+ public Duration getHttpTimeout() {
return httpTimeout;
}
- @JsonProperty public Duration getShutdownTimeout() {
+ @Override
+ @JsonProperty
+ public Duration getShutdownTimeout() {
return shutdownTimeout;
}
- @JsonProperty public Duration getOffsetFetchPeriod() {
- return offsetFetchPeriod;
- }
-
- @Override public String toString() {
- return "KafkaSupervisorTuningConfig{"
- + "maxRowsInMemory="
- + getMaxRowsInMemory()
- + ", maxRowsPerSegment="
- + getMaxRowsPerSegment()
- + ", maxTotalRows="
- + getMaxTotalRows()
- + ", maxBytesInMemory="
- + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory())
- + ", intermediatePersistPeriod="
- + getIntermediatePersistPeriod()
- + ", basePersistDirectory="
- + getBasePersistDirectory()
- + ", maxPendingPersists="
- + getMaxPendingPersists()
- + ", indexSpec="
- + getIndexSpec()
- + ", reportParseExceptions="
- + isReportParseExceptions()
- + ", handoffConditionTimeout="
- + getHandoffConditionTimeout()
- + ", resetOffsetAutomatically="
- + isResetOffsetAutomatically()
- + ", segmentWriteOutMediumFactory="
- + getSegmentWriteOutMediumFactory()
- + ", workerThreads="
- + workerThreads
- + ", chatThreads="
- + chatThreads
- + ", chatRetries="
- + chatRetries
- + ", httpTimeout="
- + httpTimeout
- + ", shutdownTimeout="
- + shutdownTimeout
- + ", offsetFetchPeriod="
- + offsetFetchPeriod
- + ", intermediateHandoffPeriod="
- + getIntermediateHandoffPeriod()
- + ", logParseExceptions="
- + isLogParseExceptions()
- + ", maxParseExceptions="
- + getMaxParseExceptions()
- + ", maxSavedParseExceptions="
- + getMaxSavedParseExceptions()
- + '}';
+ @Override
+ public Duration getRepartitionTransitionDuration() {
+ // Stopping tasks early for Kafka ingestion on partition set change is not supported yet,
+ // just return a default for now.
+ return SeekableStreamSupervisorTuningConfig
+ .defaultDuration(null, SeekableStreamSupervisorTuningConfig.DEFAULT_REPARTITION_TRANSITION_DURATION);
}
- private static Duration defaultDuration(final Period period, final String theDefault) {
- return (period == null ? new Period(theDefault) : period).toStandardDuration();
+ @JsonProperty
+ public Duration getOffsetFetchPeriod() {
+ return offsetFetchPeriod;
}
- @Override public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
-
- KafkaSupervisorTuningConfig that = (KafkaSupervisorTuningConfig) o;
-
- if (workerThreads != null ? !workerThreads.equals(that.workerThreads) : that.workerThreads != null) {
- return false;
- }
- if (chatThreads != null ? !chatThreads.equals(that.chatThreads) : that.chatThreads != null) {
- return false;
- }
- if (chatRetries != null ? !chatRetries.equals(that.chatRetries) : that.chatRetries != null) {
- return false;
- }
- if (httpTimeout != null ? !httpTimeout.equals(that.httpTimeout) : that.httpTimeout != null) {
- return false;
- }
- if (shutdownTimeout != null ? !shutdownTimeout.equals(that.shutdownTimeout) : that.shutdownTimeout != null) {
- return false;
- }
- return offsetFetchPeriod != null ?
- offsetFetchPeriod.equals(that.offsetFetchPeriod) :
- that.offsetFetchPeriod == null;
+ @Override
+ public String toString() {
+ return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment="
+ + getMaxRowsPerSegment() + ", maxTotalRows=" + getMaxTotalRows() + ", maxBytesInMemory=" + TuningConfigs
+ .getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod="
+ + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory()
+ + ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec()
+ + ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout="
+ + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically()
+ + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + ", workerThreads=" + workerThreads
+ + ", chatThreads=" + chatThreads + ", chatRetries=" + chatRetries + ", httpTimeout=" + httpTimeout
+ + ", shutdownTimeout=" + shutdownTimeout + ", offsetFetchPeriod=" + offsetFetchPeriod
+ + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", logParseExceptions="
+ + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions="
+ + getMaxSavedParseExceptions() + '}';
}
- @Override public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (workerThreads != null ? workerThreads.hashCode() : 0);
- result = 31 * result + (chatThreads != null ? chatThreads.hashCode() : 0);
- result = 31 * result + (chatRetries != null ? chatRetries.hashCode() : 0);
- result = 31 * result + (httpTimeout != null ? httpTimeout.hashCode() : 0);
- result = 31 * result + (shutdownTimeout != null ? shutdownTimeout.hashCode() : 0);
- result = 31 * result + (offsetFetchPeriod != null ? offsetFetchPeriod.hashCode() : 0);
- return result;
+ @Override
+ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() {
+ return new KafkaIndexTaskTuningConfig(getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(),
+ getMaxTotalRows(), getIntermediatePersistPeriod(), getBasePersistDirectory(), getMaxPendingPersists(),
+ getIndexSpec(), getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(),
+ getHandoffConditionTimeout(), isResetOffsetAutomatically(), getSegmentWriteOutMediumFactory(),
+ getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions());
}
}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
deleted file mode 100644
index 45ac77b..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.druid.json;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfig;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.joda.time.Period;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.util.Objects;
-
-/**
- * This class is copied from druid source code
- * in order to avoid adding additional dependencies on druid-indexing-service.
- */
-public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig {
- private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
- private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
-
- private final int maxRowsInMemory;
- private final long maxBytesInMemory;
- private final int maxRowsPerSegment;
- @Nullable private final Long maxTotalRows;
- private final Period intermediatePersistPeriod;
- private final File basePersistDirectory;
- @Deprecated private final int maxPendingPersists;
- private final IndexSpec indexSpec;
- private final boolean reportParseExceptions;
- @Deprecated private final long handoffConditionTimeout;
- private final boolean resetOffsetAutomatically;
- @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
- private final Period intermediateHandoffPeriod;
-
- private final boolean logParseExceptions;
- private final int maxParseExceptions;
- private final int maxSavedParseExceptions;
-
- @JsonCreator public KafkaTuningConfig(@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
- @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
- @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
- @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
- @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
- @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
- @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
- // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
- @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
- @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
- @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
- @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
- @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
- @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
- @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
- @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
- @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions) {
- // Cannot be a static because default basePersistDirectory is unique per-instance
- final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
-
- this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
- this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
- // initializing this to 0, it will be lazily initialized to a value
- // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
- this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
- this.maxTotalRows = maxTotalRows;
- this.intermediatePersistPeriod =
- intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod;
- this.basePersistDirectory = defaults.getBasePersistDirectory();
- this.maxPendingPersists = 0;
- this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
- this.reportParseExceptions =
- reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions;
- this.handoffConditionTimeout =
- handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout;
- this.resetOffsetAutomatically =
- resetOffsetAutomatically == null ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically;
- this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
- this.intermediateHandoffPeriod =
- intermediateHandoffPeriod == null ? new Period().withDays(Integer.MAX_VALUE) : intermediateHandoffPeriod;
-
- if (this.reportParseExceptions) {
- this.maxParseExceptions = 0;
- this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions);
- } else {
- this.maxParseExceptions =
- maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions;
- this.maxSavedParseExceptions =
- maxSavedParseExceptions == null ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS : maxSavedParseExceptions;
- }
- this.logParseExceptions =
- logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
- }
-
- public static KafkaTuningConfig copyOf(KafkaTuningConfig config) {
- return new KafkaTuningConfig(config.maxRowsInMemory,
- config.maxBytesInMemory,
- config.maxRowsPerSegment,
- config.maxTotalRows,
- config.intermediatePersistPeriod,
- config.basePersistDirectory,
- config.maxPendingPersists,
- config.indexSpec,
- true,
- config.reportParseExceptions,
- config.handoffConditionTimeout,
- config.resetOffsetAutomatically,
- config.segmentWriteOutMediumFactory,
- config.intermediateHandoffPeriod,
- config.logParseExceptions,
- config.maxParseExceptions,
- config.maxSavedParseExceptions);
- }
-
- @Override @JsonProperty public int getMaxRowsInMemory() {
- return maxRowsInMemory;
- }
-
- @Override @JsonProperty public long getMaxBytesInMemory() {
- return maxBytesInMemory;
- }
-
- @Override @JsonProperty public Integer getMaxRowsPerSegment() {
- return maxRowsPerSegment;
- }
-
- @JsonProperty @Override @Nullable public Long getMaxTotalRows() {
- return maxTotalRows;
- }
-
- @Override @JsonProperty public Period getIntermediatePersistPeriod() {
- return intermediatePersistPeriod;
- }
-
- @Override @JsonProperty public File getBasePersistDirectory() {
- return basePersistDirectory;
- }
-
- @Override @JsonProperty @Deprecated public int getMaxPendingPersists() {
- return maxPendingPersists;
- }
-
- @Override @JsonProperty public IndexSpec getIndexSpec() {
- return indexSpec;
- }
-
- /**
- * Always returns true, doesn't affect the version being built.
- */
- @SuppressWarnings("SameReturnValue") @Deprecated @JsonProperty public boolean getBuildV9Directly() {
- return true;
- }
-
- @Override @JsonProperty public boolean isReportParseExceptions() {
- return reportParseExceptions;
- }
-
- @JsonProperty public long getHandoffConditionTimeout() {
- return handoffConditionTimeout;
- }
-
- @JsonProperty public boolean isResetOffsetAutomatically() {
- return resetOffsetAutomatically;
- }
-
- @Override @JsonProperty @Nullable public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
- return segmentWriteOutMediumFactory;
- }
-
- @JsonProperty public Period getIntermediateHandoffPeriod() {
- return intermediateHandoffPeriod;
- }
-
- @JsonProperty public boolean isLogParseExceptions() {
- return logParseExceptions;
- }
-
- @JsonProperty public int getMaxParseExceptions() {
- return maxParseExceptions;
- }
-
- @JsonProperty public int getMaxSavedParseExceptions() {
- return maxSavedParseExceptions;
- }
-
- public KafkaTuningConfig withBasePersistDirectory(File dir) {
- return new KafkaTuningConfig(maxRowsInMemory,
- maxBytesInMemory,
- maxRowsPerSegment,
- maxTotalRows,
- intermediatePersistPeriod,
- dir,
- maxPendingPersists,
- indexSpec,
- true,
- reportParseExceptions,
- handoffConditionTimeout,
- resetOffsetAutomatically,
- segmentWriteOutMediumFactory,
- intermediateHandoffPeriod,
- logParseExceptions,
- maxParseExceptions,
- maxSavedParseExceptions);
- }
-
- @Override public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- KafkaTuningConfig that = (KafkaTuningConfig) o;
- return maxRowsInMemory == that.maxRowsInMemory
- && maxRowsPerSegment == that.maxRowsPerSegment
- && maxBytesInMemory == that.maxBytesInMemory
- && Objects.equals(maxTotalRows, that.maxTotalRows)
- && maxPendingPersists == that.maxPendingPersists
- && reportParseExceptions == that.reportParseExceptions
- && handoffConditionTimeout == that.handoffConditionTimeout
- && resetOffsetAutomatically == that.resetOffsetAutomatically
- && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod)
- && Objects.equals(basePersistDirectory, that.basePersistDirectory)
- && Objects.equals(indexSpec, that.indexSpec)
- && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory)
- && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod)
- && logParseExceptions == that.logParseExceptions
- && maxParseExceptions == that.maxParseExceptions
- && maxSavedParseExceptions == that.maxSavedParseExceptions;
- }
-
- @Override public int hashCode() {
- return Objects.hash(maxRowsInMemory,
- maxRowsPerSegment,
- maxBytesInMemory,
- maxTotalRows,
- intermediatePersistPeriod,
- basePersistDirectory,
- maxPendingPersists,
- indexSpec,
- reportParseExceptions,
- handoffConditionTimeout,
- resetOffsetAutomatically,
- segmentWriteOutMediumFactory,
- intermediateHandoffPeriod,
- logParseExceptions,
- maxParseExceptions,
- maxSavedParseExceptions);
- }
-
- @Override public String toString() {
- return "KafkaTuningConfig{"
- + "maxRowsInMemory="
- + maxRowsInMemory
- + ", maxRowsPerSegment="
- + maxRowsPerSegment
- + ", maxTotalRows="
- + maxTotalRows
- + ", maxBytesInMemory="
- + maxBytesInMemory
- + ", intermediatePersistPeriod="
- + intermediatePersistPeriod
- + ", basePersistDirectory="
- + basePersistDirectory
- + ", maxPendingPersists="
- + maxPendingPersists
- + ", indexSpec="
- + indexSpec
- + ", reportParseExceptions="
- + reportParseExceptions
- + ", handoffConditionTimeout="
- + handoffConditionTimeout
- + ", resetOffsetAutomatically="
- + resetOffsetAutomatically
- + ", segmentWriteOutMediumFactory="
- + segmentWriteOutMediumFactory
- + ", intermediateHandoffPeriod="
- + intermediateHandoffPeriod
- + ", logParseExceptions="
- + logParseExceptions
- + ", maxParseExceptions="
- + maxParseExceptions
- + ", maxSavedParseExceptions="
- + maxSavedParseExceptions
- + '}';
- }
-}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java
new file mode 100644
index 0000000..289f0e8
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.indexing.RealtimeTuningConfig;
+import org.apache.druid.segment.indexing.TuningConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig {
+ private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
+ private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false;
+
+ private final int maxRowsInMemory;
+ private final long maxBytesInMemory;
+ private final DynamicPartitionsSpec partitionsSpec;
+ private final Period intermediatePersistPeriod;
+ private final File basePersistDirectory;
+ @Deprecated
+ private final int maxPendingPersists;
+ private final IndexSpec indexSpec;
+ private final IndexSpec indexSpecForIntermediatePersists;
+ private final boolean reportParseExceptions;
+ private final long handoffConditionTimeout;
+ private final boolean resetOffsetAutomatically;
+ @Nullable
+ private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
+ private final Period intermediateHandoffPeriod;
+ private final boolean skipSequenceNumberAvailabilityCheck;
+
+ private final boolean logParseExceptions;
+ private final int maxParseExceptions;
+ private final int maxSavedParseExceptions;
+
+ public SeekableStreamIndexTaskTuningConfig(
+ @Nullable Integer maxRowsInMemory,
+ @Nullable Long maxBytesInMemory,
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Long maxTotalRows,
+ @Nullable Period intermediatePersistPeriod,
+ @Nullable File basePersistDirectory,
+ @Nullable Integer maxPendingPersists,
+ @Nullable IndexSpec indexSpec,
+ @Nullable IndexSpec indexSpecForIntermediatePersists,
+ // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+ @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+ @Deprecated @Nullable Boolean reportParseExceptions,
+ @Nullable Long handoffConditionTimeout,
+ @Nullable Boolean resetOffsetAutomatically,
+ Boolean skipSequenceNumberAvailabilityCheck,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable Period intermediateHandoffPeriod,
+ @Nullable Boolean logParseExceptions,
+ @Nullable Integer maxParseExceptions,
+ @Nullable Integer maxSavedParseExceptions
+ ) {
+ // Cannot be a static because default basePersistDirectory is unique per-instance
+ final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
+
+ this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
+ this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
+ // initializing this to 0, it will be lazily initialized to a value
+ // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+ this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
+ this.intermediatePersistPeriod = intermediatePersistPeriod == null
+ ? defaults.getIntermediatePersistPeriod()
+ : intermediatePersistPeriod;
+ this.basePersistDirectory = defaults.getBasePersistDirectory();
+ this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists;
+ this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
+ this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
+ this.indexSpec : indexSpecForIntermediatePersists;
+ this.reportParseExceptions = reportParseExceptions == null
+ ? defaults.isReportParseExceptions()
+ : reportParseExceptions;
+ this.handoffConditionTimeout = handoffConditionTimeout == null
+ ? defaults.getHandoffConditionTimeout()
+ : handoffConditionTimeout;
+ this.resetOffsetAutomatically = resetOffsetAutomatically == null
+ ? DEFAULT_RESET_OFFSET_AUTOMATICALLY
+ : resetOffsetAutomatically;
+ this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
+ this.intermediateHandoffPeriod = intermediateHandoffPeriod == null
+ ? new Period().withDays(Integer.MAX_VALUE)
+ : intermediateHandoffPeriod;
+ this.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck == null
+ ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK
+ : skipSequenceNumberAvailabilityCheck;
+
+ if (this.reportParseExceptions) {
+ this.maxParseExceptions = 0;
+ this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions);
+ } else {
+ this.maxParseExceptions = maxParseExceptions == null
+ ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS
+ : maxParseExceptions;
+ this.maxSavedParseExceptions = maxSavedParseExceptions == null
+ ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS
+ : maxSavedParseExceptions;
+ }
+ this.logParseExceptions = logParseExceptions == null
+ ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
+ : logParseExceptions;
+ }
+
+ @Override
+ @JsonProperty
+ public int getMaxRowsInMemory() {
+ return maxRowsInMemory;
+ }
+
+ @Override
+ @JsonProperty
+ public long getMaxBytesInMemory() {
+ return maxBytesInMemory;
+ }
+
+ @Override
+ @JsonProperty
+ public Integer getMaxRowsPerSegment() {
+ return partitionsSpec.getMaxRowsPerSegment();
+ }
+
+ @JsonProperty
+ @Override
+ @Nullable
+ public Long getMaxTotalRows() {
+ return partitionsSpec.getMaxTotalRows();
+ }
+
+ @Override
+ public DynamicPartitionsSpec getPartitionsSpec() {
+ return partitionsSpec;
+ }
+
+ @Override
+ @JsonProperty
+ public Period getIntermediatePersistPeriod() {
+ return intermediatePersistPeriod;
+ }
+
+ @Override
+ @JsonProperty
+ public File getBasePersistDirectory() {
+ return basePersistDirectory;
+ }
+
+ @Override
+ @JsonProperty
+ @Deprecated
+ public int getMaxPendingPersists() {
+ return maxPendingPersists;
+ }
+
+ @Override
+ @JsonProperty
+ public IndexSpec getIndexSpec() {
+ return indexSpec;
+ }
+
+ @JsonProperty
+ @Override
+ public IndexSpec getIndexSpecForIntermediatePersists() {
+ return indexSpecForIntermediatePersists;
+ }
+
+ /**
+ * Always returns true, doesn't affect the version being built.
+ */
+ @Deprecated
+ @JsonProperty
+ public boolean getBuildV9Directly() {
+ return true;
+ }
+
+ @Override
+ @JsonProperty
+ public boolean isReportParseExceptions() {
+ return reportParseExceptions;
+ }
+
+ @JsonProperty
+ public long getHandoffConditionTimeout() {
+ return handoffConditionTimeout;
+ }
+
+ @JsonProperty
+ public boolean isResetOffsetAutomatically() {
+ return resetOffsetAutomatically;
+ }
+
+ @Override
+ @JsonProperty
+ @Nullable
+ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() {
+ return segmentWriteOutMediumFactory;
+ }
+
+ @JsonProperty
+ public Period getIntermediateHandoffPeriod() {
+ return intermediateHandoffPeriod;
+ }
+
+ @JsonProperty
+ public boolean isLogParseExceptions() {
+ return logParseExceptions;
+ }
+
+ @JsonProperty
+ public int getMaxParseExceptions() {
+ return maxParseExceptions;
+ }
+
+ @JsonProperty
+ public int getMaxSavedParseExceptions() {
+ return maxSavedParseExceptions;
+ }
+
+ @JsonProperty
+ public boolean isSkipSequenceNumberAvailabilityCheck() {
+ return skipSequenceNumberAvailabilityCheck;
+ }
+
+ @Override
+ public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir);
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
+ return maxRowsInMemory == that.maxRowsInMemory &&
+ maxBytesInMemory == that.maxBytesInMemory &&
+ maxPendingPersists == that.maxPendingPersists &&
+ reportParseExceptions == that.reportParseExceptions &&
+ handoffConditionTimeout == that.handoffConditionTimeout &&
+ resetOffsetAutomatically == that.resetOffsetAutomatically &&
+ skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck &&
+ logParseExceptions == that.logParseExceptions &&
+ maxParseExceptions == that.maxParseExceptions &&
+ maxSavedParseExceptions == that.maxSavedParseExceptions &&
+ Objects.equals(partitionsSpec, that.partitionsSpec) &&
+ Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) &&
+ Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
+ Objects.equals(indexSpec, that.indexSpec) &&
+ Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
+ Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
+ Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ maxRowsInMemory,
+ maxBytesInMemory,
+ partitionsSpec,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod,
+ skipSequenceNumberAvailabilityCheck,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
+ }
+
+ @Override
+ public abstract String toString();
+}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java
new file mode 100644
index 0000000..ec71518
--- /dev/null
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid.json;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+/**
+ * This class is copied from druid source code
+ * in order to avoid adding additional dependencies on druid-indexing-service.
+ */
+public interface SeekableStreamSupervisorTuningConfig {
+
+ int DEFAULT_CHAT_RETRIES = 8;
+ String DEFAULT_HTTP_TIMEOUT = "PT10S";
+ String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S";
+ String DEFAULT_REPARTITION_TRANSITION_DURATION = "PT2M";
+
+ static Duration defaultDuration(final Period period, final String theDefault) {
+ return (period == null ? new Period(theDefault) : period).toStandardDuration();
+ }
+
+ @JsonProperty
+ Integer getWorkerThreads();
+
+ @JsonProperty
+ Integer getChatThreads();
+
+ @JsonProperty
+ Long getChatRetries();
+
+ @JsonProperty
+ Duration getHttpTimeout();
+
+ @JsonProperty
+ Duration getShutdownTimeout();
+
+ @JsonProperty
+ Duration getRepartitionTransitionDuration();
+
+ SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig();
+}
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 4142e48..19379e1 100644
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -128,7 +128,8 @@ public abstract class DruidQueryRecordReader<R extends Comparable<R>> extends Re
// We got exception while querying results from this host.
CloseQuietly.close(iterator);
}
- LOG.error("Failure getting results for query[{}] from host[{}] because of [{}]", query, address, e.getMessage());
+ LOG.error("Failure getting results for query[{}] from host[{}] because of [{}]",
+ query, address, e.getMessage());
if (ex == null) {
ex = e;
} else {
diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
deleted file mode 100644
index 907558f..0000000
--- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.druid.serde;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-import com.fasterxml.jackson.databind.JavaType;
-import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
-import org.apache.hadoop.hive.druid.conf.DruidConstants;
-import org.apache.hadoop.io.NullWritable;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-
-import org.apache.druid.query.Result;
-import org.apache.druid.query.select.EventHolder;
-import org.apache.druid.query.select.SelectResultValue;
-
-/**
- * Record reader for results for Druid SelectQuery.
- */
-public class DruidSelectQueryRecordReader extends DruidQueryRecordReader<Result<SelectResultValue>> {
-
- private static final TypeReference<Result<SelectResultValue>>
- TYPE_REFERENCE =
- new TypeReference<Result<SelectResultValue>>() {
- };
-
- private Iterator<EventHolder> values = Collections.emptyIterator();
-
- @Override protected JavaType getResultTypeDef() {
- return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE);
- }
-
- @Override public boolean nextKeyValue() throws IOException {
- if (values.hasNext()) {
- return true;
- }
- if (getQueryResultsIterator().hasNext()) {
- Result<SelectResultValue> current = getQueryResultsIterator().next();
- values = current.getValue().getEvents().iterator();
- return nextKeyValue();
- }
- return false;
- }
-
- @Override public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return NullWritable.get();
- }
-
- @Override public DruidWritable getCurrentValue() throws IOException, InterruptedException {
- // Create new value
- DruidWritable value = new DruidWritable(false);
- EventHolder e = values.next();
- value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis());
- value.getValue().putAll(e.getEvent());
- return value;
- }
-
- @Override public boolean next(NullWritable key, DruidWritable value) throws IOException {
- if (nextKeyValue()) {
- // Update value
- value.getValue().clear();
- EventHolder e = values.next();
- value.getValue().put(DruidConstants.DEFAULT_TIMESTAMP_COLUMN, e.getTimestamp().getMillis());
- value.getValue().putAll(e.getEvent());
- return true;
- }
- return false;
- }
-
- @Override public float getProgress() {
- return getQueryResultsIterator().hasNext() || values.hasNext() ? 0 : 1;
- }
-
-}
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
index 0b2072c..7d94f1a 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestDruidStorageHandler.java
@@ -93,12 +93,8 @@ public class TestDruidStorageHandler {
private DataSegment createSegment(String location, Interval interval, String version, ShardSpec shardSpec)
throws IOException {
FileUtils.writeStringToFile(new File(location), "dummySegmentData");
- return DataSegment.builder()
- .dataSource(DATA_SOURCE_NAME)
- .version(version)
- .interval(interval)
- .shardSpec(shardSpec)
- .loadSpec(ImmutableMap.of("path", location))
+ return DataSegment.builder().dataSource(DATA_SOURCE_NAME).version(version).interval(interval).shardSpec(shardSpec)
+ .loadSpec(ImmutableMap.of("path", location)).size(1000L)
.build();
}
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
index 58f4a44..2bcbb14 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/TestHiveDruidQueryBasedInputFormat.java
@@ -141,47 +141,6 @@ import org.junit.Test;
+ "\"context\":{\"queryId\":\"\"},"
+ "\"descending\":false}, [localhost:8082]}]";
- private static final String
- SELECT_QUERY =
- "{ \"queryType\": \"select\", "
- + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", "
- + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\","
- + "\"newpage\",\"user\"], "
- + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], "
- + " \"granularity\": \"all\", "
- + " \"intervals\": [ \"2013-01-01T00:00:00.000-08:00/2013-01-02T00:00:00.000-08:00\" ], "
- + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5}, "
- + " \"context\":{\"druid.query.fetch\":true}}";
- private static final String
- SELECT_QUERY_SPLIT =
- "[HiveDruidSplit{{\"queryType\":\"select\","
- + "\"dataSource\":{\"type\":\"table\",\"name\":\"wikipedia\"},"
- + "\"intervals\":{\"type\":\"LegacySegmentSpec\",\"intervals\":[\"2013-01-01T08:00:00"
- + ".000Z/2013-01-02T08:00:00.000Z\"]},"
- + "\"descending\":false,"
- + "\"filter\":null,"
- + "\"granularity\":{\"type\":\"all\"},"
- + "\"dimensions\":[{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"robot\",\"outputName\":\"robot\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"namespace\",\"outputName\":\"namespace\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"anonymous\",\"outputName\":\"anonymous\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"unpatrolled\",\"outputName\":\"unpatrolled\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"page\",\"outputName\":\"page\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"language\",\"outputName\":\"language\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"newpage\",\"outputName\":\"newpage\","
- + "\"outputType\":\"STRING\"},"
- + "{\"type\":\"LegacyDimensionSpec\",\"dimension\":\"user\",\"outputName\":\"user\","
- + "\"outputType\":\"STRING\"}],"
- + "\"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"],"
- + "\"virtualColumns\":[],"
- + "\"pagingSpec\":{\"pagingIdentifiers\":{},\"threshold\":5,\"fromNext\":false},"
- + "\"context\":{\"druid.query.fetch\":true,\"queryId\":\"\"}}, [localhost:8082]}]";
-
@Test
public void testTimeZone() throws Exception {
DruidQueryBasedInputFormat input = new DruidQueryBasedInputFormat();
@@ -202,9 +161,6 @@ import org.junit.Test;
resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf);
assertEquals(GROUP_BY_QUERY_SPLIT, Arrays.toString(resultSplits));
- conf = createPropertiesQuery("sample_datasource", Query.SELECT, SELECT_QUERY);
- resultSplits = (HiveDruidSplit[]) method1.invoke(input, conf);
- assertEquals(SELECT_QUERY_SPLIT, Arrays.toString(resultSplits));
}
private static Configuration createPropertiesQuery(String dataSource, String queryType, String jsonQuery) {
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
index edfcc65..b2a871e 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java
@@ -83,7 +83,6 @@ import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Row;
import org.apache.druid.query.Query;
import org.apache.druid.query.Result;
-import org.apache.druid.query.select.SelectResultValue;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.query.topn.TopNResultValue;
import org.junit.rules.ExpectedException;
@@ -92,7 +91,8 @@ import org.junit.rules.ExpectedException;
* Basic tests for Druid SerDe. The examples are taken from Druid 0.9.1.1
* documentation.
*/
-@SuppressWarnings({ "SameParameterValue", "SpellCheckingInspection" }) public class TestDruidSerDe {
+@SuppressWarnings({"SameParameterValue", "SpellCheckingInspection"})
+public class TestDruidSerDe {
// Timeseries query
private static final String
TIMESERIES_QUERY =
@@ -156,12 +156,12 @@ import org.junit.rules.ExpectedException;
new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC)),
0L,
1.0F,
- 2.2222F },
+ 2.2222F},
new Object[]{
new TimestampTZ(Instant.ofEpochMilli(1325462400000L).atZone(ZoneOffset.UTC)),
2L,
3.32F,
- 4F } };
+ 4F}};
// Timeseries query results as records (types defined by metastore)
private static final String TIMESERIES_COLUMN_NAMES = "timestamp,sample_name1,sample_name2,sample_divide";
@@ -269,19 +269,19 @@ import org.junit.rules.ExpectedException;
private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][]{
new Object[]{
new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)),
- "dim1_val", 111L, 10669F, 96.11711711711712F },
+ "dim1_val", 111L, 10669F, 96.11711711711712F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- "another_dim1_val", 88L, 28344F, 322.09090909090907F },
+ "another_dim1_val", 88L, 28344F, 322.09090909090907F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- "dim1_val3", 70L, 871F, 12.442857142857143F },
+ "dim1_val3", 70L, 871F, 12.442857142857143F},
new Object[]{
new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)),
- "dim1_val4", 62L, 815F, 13.14516129032258F },
+ "dim1_val4", 62L, 815F, 13.14516129032258F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))),
- "dim1_val5", 60L, 2787F, 46.45F } };
+ "dim1_val5", 60L, 2787F, 46.45F}};
// TopN query results as records (types defined by metastore)
private static final String TOPN_COLUMN_NAMES = "timestamp,sample_dim,count,some_metric,sample_divide";
@@ -422,24 +422,24 @@ import org.junit.rules.ExpectedException;
private static final Object[][] GROUP_BY_QUERY_EXTRACTION_RESULTS_RECORDS = new Object[][]{
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))),
- (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 200L },
+ (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 200L},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
- (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 400L } };
+ (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 400L}};
private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][]{
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), "India",
- "phone", 88L, 29.91233453, 60.32F },
+ "phone", 88L, 29.91233453, 60.32F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))),
- "Spain", "pc", 16L, 172.93494959, 6.333333F } };
+ "Spain", "pc", 16L, 172.93494959, 6.333333F}};
private static final Object[][] GB_MONTH_EXTRACTION_RESULTS_RECORDS = new Object[][]{
new Object[]{
- (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 1, 200L },
+ (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 1, 200L},
new Object[]{
- (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 1, 400L } };
+ (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 1, 400L}};
// GroupBy query results as records (types defined by metastore)
private static final String GROUP_BY_COLUMN_NAMES = "timestamp,country,device,total_usage,data_transfer,avg_usage";
@@ -455,146 +455,14 @@ import org.junit.rules.ExpectedException;
private static final String GB_MONTH_EXTRACTIONS_COLUMN_NAMES = "timestamp,extract_month,$f1";
private static final String GB_MONTH_EXTRACTIONS_COLUMN_TYPES = "timestamp with local time zone,int,bigint";
- // Select query
- private static final String
- SELECT_QUERY =
- "{ \"queryType\": \"select\", "
- + " \"dataSource\": \"wikipedia\", \"descending\": \"false\", "
- + " \"dimensions\":[\"robot\",\"namespace\",\"anonymous\",\"unpatrolled\",\"page\",\"language\","
- + "\"newpage\",\"user\"], "
- + " \"metrics\":[\"count\",\"added\",\"delta\",\"variation\",\"deleted\"], "
- + " \"granularity\": \"all\", "
- + " \"intervals\": [ \"2013-01-01/2013-01-02\" ], "
- + " \"pagingSpec\":{\"pagingIdentifiers\": {}, \"threshold\":5} }";
-
- // Select query results
- private static final String
- SELECT_QUERY_RESULTS =
- "[{ "
- + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
- + " \"result\" : { "
- + " \"pagingIdentifiers\" : { "
- + " \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\" : 4 }, "
- + " \"events\" : [ { "
- + " \"segmentId\" : \"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00"
- + ".000Z_2013-01-10T08:13:47.830Z_v9\", "
- + " \"offset\" : 0, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
- + " \"robot\" : 1, "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"11._korpus_(NOVJ)\", "
- + " \"language\" : \"sl\", "
- + " \"newpage\" : \"0\", "
- + " \"user\" : \"EmausBot\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 39.0, "
- + " \"delta\" : 39.0, "
- + " \"variation\" : 39.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
- + ".830Z_v9\", "
- + " \"offset\" : 1, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:00.000Z\", "
- + " \"robot\" : 0, "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"112_U.S._580\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 70.0, "
- + " \"delta\" : 70.0, "
- + " \"variation\" : 70.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
- + ".830Z_v9\", "
- + " \"offset\" : 2, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
- + " \"robot\" : 0, "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"113_U.S._243\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 77.0, "
- + " \"delta\" : 77.0, "
- + " \"variation\" : 77.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
- + ".830Z_v9\", "
- + " \"offset\" : 3, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
- + " \"robot\" : 0, "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"113_U.S._73\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 70.0, "
- + " \"delta\" : 70.0, "
- + " \"variation\" : 70.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " }, { "
- + " \"segmentId\" : \"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47"
- + ".830Z_v9\", "
- + " \"offset\" : 4, "
- + " \"event\" : { "
- + " \"timestamp\" : \"2013-01-01T00:00:12.000Z\", "
- + " \"robot\" : 0, "
- + " \"namespace\" : \"article\", "
- + " \"anonymous\" : \"0\", "
- + " \"unpatrolled\" : \"0\", "
- + " \"page\" : \"113_U.S._756\", "
- + " \"language\" : \"en\", "
- + " \"newpage\" : \"1\", "
- + " \"user\" : \"MZMcBride\", "
- + " \"count\" : 1.0, "
- + " \"added\" : 68.0, "
- + " \"delta\" : 68.0, "
- + " \"variation\" : 68.0, "
- + " \"deleted\" : 0.0 "
- + " } "
- + " } ] }} ]";
-
- // Select query results as records (types defined by metastore)
- private static final String
- SELECT_COLUMN_NAMES =
+ private static final String SCAN_COLUMN_NAMES =
"__time,robot,namespace,anonymous,unpatrolled,page,language,newpage,user,count,added,delta,variation,deleted";
- private static final String
- SELECT_COLUMN_TYPES =
+ private static final String SCAN_COLUMN_TYPES =
"timestamp with local time zone,boolean,string,string,string,string,string,string,string,double,double,float,"
+ "float,float";
- private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][]{
- new Object[]{
- (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE,
- "article",
- "0",
- "0",
- "11._korpus_(NOVJ)",
- "sl",
- "0",
- "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F },
+ private static final Object[][] SCAN_QUERY_RESULTS_RECORDS = new Object[][]{
+ new Object[]{(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE,
+ "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
"article",
@@ -603,7 +471,7 @@ import org.junit.rules.ExpectedException;
"112_U.S._580",
"en",
"1",
- "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F },
+ "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
"article",
@@ -612,7 +480,7 @@ import org.junit.rules.ExpectedException;
"113_U.S._243",
"en",
"1",
- "MZMcBride", 1.0d, 77.0d, 77.0F, 77.0F, 0.0F },
+ "MZMcBride", 1.0d, 77.0d, 77.0F, 77.0F, 0.0F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
"article",
@@ -621,7 +489,7 @@ import org.junit.rules.ExpectedException;
"113_U.S._73",
"en",
"1",
- "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F },
+ "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F},
new Object[]{
(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE,
"article",
@@ -630,7 +498,7 @@ import org.junit.rules.ExpectedException;
"113_U.S._756",
"en",
"1",
- "MZMcBride", 1.0d, 68.0d, 68.0F, 68.0F, 0.0F } };
+ "MZMcBride", 1.0d, 68.0d, 68.0F, 68.0F, 0.0F}};
// Scan query
private static final String
@@ -664,7 +532,8 @@ import org.junit.rules.ExpectedException;
+ ".0,68.0,68.0,68.0,0.0]"
+ "]}]";
- @Before public void setup() throws IOException {
+ @Before
+ public void setup() throws IOException {
tsQueryResults =
DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(
TIMESERIES_QUERY_RESULTS,
@@ -691,12 +560,6 @@ import org.junit.rules.ExpectedException;
GB_MONTH_EXTRACTIONS_RESULTS,
new TypeReference<List<Row>>() {
}));
- selectQueryResults =
- DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(
- SELECT_QUERY_RESULTS,
- new TypeReference<List<Result<SelectResultValue>>>() {
- }));
-
scanQueryResults =
DruidStorageHandlerUtils.SMILE_MAPPER.writeValueAsBytes(DruidStorageHandlerUtils.JSON_MAPPER.readValue(
SCAN_QUERY_RESULTS,
@@ -704,7 +567,8 @@ import org.junit.rules.ExpectedException;
}));
}
- @Test public void testDruidDeserializer()
+ @Test
+ public void testDruidDeserializer()
throws SerDeException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException,
IOException, InterruptedException, NoSuchMethodException, InvocationTargetException {
// Create, initialize, and test the SerDe
@@ -765,22 +629,18 @@ import org.junit.rules.ExpectedException;
GB_MONTH_EXTRACTIONS,
groupByMonthExtractQueryResults,
GB_MONTH_EXTRACTION_RESULTS_RECORDS);
- // Select query
- tbl = createPropertiesQuery("wikipedia", Query.SELECT, SELECT_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES);
- SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
- deserializeQueryResults(serDe, Query.SELECT, SELECT_QUERY, selectQueryResults, SELECT_QUERY_RESULTS_RECORDS);
// Scan query -- results should be same as select query
- tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SELECT_COLUMN_NAMES, SELECT_COLUMN_TYPES);
+ tbl = createPropertiesQuery("wikipedia", Query.SCAN, SCAN_QUERY, SCAN_COLUMN_NAMES, SCAN_COLUMN_TYPES);
SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
- deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SELECT_QUERY_RESULTS_RECORDS);
+ deserializeQueryResults(serDe, Query.SCAN, SCAN_QUERY, scanQueryResults, SCAN_QUERY_RESULTS_RECORDS);
}
private static Properties createPropertiesQuery(String dataSource,
- String queryType,
- String jsonQuery,
- String columnNames,
- String columnTypes) {
+ String queryType,
+ String jsonQuery,
+ String columnNames,
+ String columnTypes) {
Properties tbl = new Properties();
// Set the configuration parameters
@@ -792,11 +652,12 @@ import org.junit.rules.ExpectedException;
return tbl;
}
- @SuppressWarnings("unchecked") private void deserializeQueryResults(DruidSerDe serDe,
- String queryType,
- String jsonQuery,
- byte[] resultString,
- Object[][] records)
+ @SuppressWarnings("unchecked")
+ private void deserializeQueryResults(DruidSerDe serDe,
+ String queryType,
+ String jsonQuery,
+ byte[] resultString,
+ Object[][] records)
throws SerDeException, IOException, NoSuchFieldException, SecurityException, IllegalArgumentException,
IllegalAccessException, InterruptedException, NoSuchMethodException, InvocationTargetException {
@@ -886,7 +747,8 @@ import org.junit.rules.ExpectedException;
.put("__time_granularity", 1377907200000L)
.build());
- @Test public void testDruidObjectSerializer()
+ @Test
+ public void testDruidObjectSerializer()
throws SerDeException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException,
IOException, InterruptedException, NoSuchMethodException, InvocationTargetException {
// Create, initialize, and test the SerDe
@@ -899,9 +761,11 @@ import org.junit.rules.ExpectedException;
serializeObject(tbl, serDe, ROW_OBJECT, DRUID_WRITABLE);
}
- @Rule public ExpectedException expectedEx = ExpectedException.none();
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
- @Test public void testDruidObjectSerializerwithNullTimestamp() throws Exception {
+ @Test
+ public void testDruidObjectSerializerwithNullTimestamp() throws Exception {
// Create, initialize, and test the SerDe
DruidSerDe serDe = new DruidSerDe();
Configuration conf = new Configuration();
@@ -939,9 +803,9 @@ import org.junit.rules.ExpectedException;
}
private static void serializeObject(Properties properties,
- DruidSerDe serDe,
- Object[] rowObject,
- DruidWritable druidWritable) throws SerDeException {
+ DruidSerDe serDe,
+ Object[] rowObject,
+ DruidWritable druidWritable) throws SerDeException {
// Build OI with timestamp granularity column
final List<String> columnNames = new ArrayList<>(Utilities.getColumnNames(properties));
columnNames.add(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME);
@@ -991,7 +855,8 @@ import org.junit.rules.ExpectedException;
.put("c8", (byte) 0)
.build());
- @Test public void testDruidObjectDeserializer()
+ @Test
+ public void testDruidObjectDeserializer()
throws SerDeException, NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException,
IOException, InterruptedException, NoSuchMethodException, InvocationTargetException {
// Create, initialize, and test the SerDe
@@ -1004,9 +869,10 @@ import org.junit.rules.ExpectedException;
deserializeObject(serDe, ROW_OBJECT_2, DRUID_WRITABLE_2);
}
- @SuppressWarnings("unchecked") private static void deserializeObject(DruidSerDe serDe,
- Object[] rowObject,
- DruidWritable druidWritable) throws SerDeException {
+ @SuppressWarnings("unchecked")
+ private static void deserializeObject(DruidSerDe serDe,
+ Object[] rowObject,
+ DruidWritable druidWritable) throws SerDeException {
// Deserialize
List<Object> object = (List<Object>) serDe.deserialize(druidWritable);
// Check result
diff --git a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
index 91b5f8b..3fb7bdf 100644
--- a/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
+++ b/druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java
@@ -150,29 +150,14 @@ import java.util.stream.Collectors;
RealtimeTuningConfig
tuningConfig =
new RealtimeTuningConfig(null,
- null,
- null,
- null,
- temporaryFolder.newFolder(),
- null,
- null,
- null,
- null,
- indexSpec,
- null,
- 0,
- 0,
- null,
- null,
- 0L,
- null,
- null);
+ null, null, null, temporaryFolder.newFolder(), null, null, null, null, indexSpec, null, null, 0, 0, null,
+ null, 0L, null, null);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig() {
@Override public File getStorageDirectory() {
return segmentOutputDir;
}
- }, objectMapper);
+ });
Path
segmentDescriptorPath =
diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml
index 9695486..fe6ad2e 100644
--- a/itests/qtest-druid/pom.xml
+++ b/itests/qtest-druid/pom.xml
@@ -37,6 +37,7 @@
<!-- dependencies are always listed in sorted order by groupId, artifectId -->
<properties>
<hive.path.to.root>../..</hive.path.to.root>
+ <druid.avatica.version>1.15.0</druid.avatica.version>
<druid.curator.version>4.0.0</druid.curator.version>
<druid.jersey.version>1.19.3</druid.jersey.version>
<druid.jetty.version>9.4.10.v20180503</druid.jetty.version>
@@ -44,6 +45,7 @@
<druid.guava.version>16.0.1</druid.guava.version>
<druid.guice.version>4.1.0</druid.guice.version>
<kafka.test.version>2.0.0</kafka.test.version>
+ <druid.guice.version>4.1.0</druid.guice.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
@@ -164,6 +166,26 @@
<artifactId>curator-recipes</artifactId>
<version>${druid.curator.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ <version>${avatica.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-core</artifactId>
+ <version>${avatica.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-metrics</artifactId>
+ <version>${avatica.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-server</artifactId>
+ <version>${avatica.version}</version>
+ </dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-bundle</artifactId>
diff --git a/pom.xml b/pom.xml
index 1432bcf..89c37e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,7 +146,7 @@
<derby.version>10.14.1.0</derby.version>
<dropwizard.version>3.1.0</dropwizard.version>
<dropwizard-metrics-hadoop-metrics2-reporter.version>0.1.2</dropwizard-metrics-hadoop-metrics2-reporter.version>
- <druid.version>0.14.0-incubating</druid.version>
+ <druid.version>0.17.1</druid.version>
<flatbuffers.version>1.6.0.1</flatbuffers.version>
<guava.version>19.0</guava.version>
<groovy.version>2.4.11</groovy.version>
diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
index f6a417b..2331fba 100644
--- a/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_delimited.q.out
@@ -120,7 +120,7 @@ POSTHOOK: query: Select page FROM druid_kafka_test_delimited
POSTHOOK: type: QUERY
POSTHOOK: Input: default@druid_kafka_test_delimited
POSTHOOK: Output: hdfs://### HDFS PATH ###
- "Gypsy Danger"
+"Gypsy Danger"
"Striker Eureka"
"Cherno Alpha"
"Crimson Typhoon"
diff --git a/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out b/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out
index 25abb74..0dd2295 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_semijoin_reduction_all_types.q.out
@@ -179,7 +179,7 @@ STAGE PLANS:
properties:
druid.fieldNames cstring1
druid.fieldTypes string
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cstring1","lower":"DynamicValue(RS_4_alltypesorc_small_cstring1_min)","upp [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cstring1","lower":"DynamicValue(RS_4_alltypesorc_small_cstr [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -340,7 +340,7 @@ STAGE PLANS:
properties:
druid.fieldNames ctinyint
druid.fieldTypes tinyint
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"ctinyint","lower":"DynamicValue(RS_4_alltypesorc_small_ctinyint_min)","upp [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"ctinyint","lower":"DynamicValue(RS_4_alltypesorc_small_ctin [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -501,7 +501,7 @@ STAGE PLANS:
properties:
druid.fieldNames csmallint
druid.fieldTypes smallint
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"csmallint","lower":"DynamicValue(RS_4_alltypesorc_small_csmallint_min)","u [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"csmallint","lower":"DynamicValue(RS_4_alltypesorc_small_csm [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -662,7 +662,7 @@ STAGE PLANS:
properties:
druid.fieldNames cint
druid.fieldTypes int
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cint","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper":"Dyn [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cint","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -823,7 +823,7 @@ STAGE PLANS:
properties:
druid.fieldNames cbigint
druid.fieldTypes bigint
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cbigint","lower":"DynamicValue(RS_4_alltypesorc_small_cbigint_min)","upper [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cbigint","lower":"DynamicValue(RS_4_alltypesorc_small_cbigi [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -984,7 +984,7 @@ STAGE PLANS:
properties:
druid.fieldNames cfloat
druid.fieldTypes float
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"not","field":{"type":"selector","dimension":"cfloat","value":null,"extractionFn":null}},"columns":["cfloat"],"legacy":null,"context":null,"des [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"not","field":{"type":"selector","dimension":"cfloat","value":null,"extractionFn":null}},"columns":["cfloat"],"legacy":null,"con [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -1145,7 +1145,7 @@ STAGE PLANS:
properties:
druid.fieldNames cdouble
druid.fieldTypes double
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdouble","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)","upper [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdouble","lower":"DynamicValue(RS_4_alltypesorc_small_cdoub [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -1295,7 +1295,7 @@ STAGE PLANS:
properties:
druid.fieldNames vc
druid.fieldTypes timestamp with local time zone
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":null,"columns":["vc"],"legacy":null,"context":null,"descending":false,"g [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[{"type":"expression","name":"vc","expression":"\"__time\"","outputType":"LONG"}],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":null,"columns":["vc"],"legacy":null,"context":null,"desce [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 348640 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -1456,7 +1456,7 @@ STAGE PLANS:
properties:
druid.fieldNames cboolean1
druid.fieldTypes boolean
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"not","field":{"type":"selector","dimension":"cboolean1","value":null,"extractionFn":null}},"columns":["cboolean1"],"legacy":null,"context":nul [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"not","field":{"type":"selector","dimension":"cboolean1","value":null,"extractionFn":null}},"columns":["cboolean1"],"legacy":nul [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 34864 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -1617,7 +1617,7 @@ STAGE PLANS:
properties:
druid.fieldNames cintstring
druid.fieldTypes string
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cintstring","lower":"DynamicValue(RS_4_alltypesorc_small_cint_min)","upper [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cintstring","lower":"DynamicValue(RS_4_alltypesorc_small_ci [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -1778,7 +1778,7 @@ STAGE PLANS:
properties:
druid.fieldNames cdoublestring
druid.fieldTypes string
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdoublestring","lower":"DynamicValue(RS_4_alltypesorc_small_cdouble_min)", [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cdoublestring","lower":"DynamicValue(RS_4_alltypesorc_small [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
Filter Operator
@@ -1939,7 +1939,7 @@ STAGE PLANS:
properties:
druid.fieldNames cfloatstring
druid.fieldTypes string
- druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cfloatstring","lower":"DynamicValue(RS_4_alltypesorc_small_cfloat_min)","u [...]
+ druid.query.json {"queryType":"scan","dataSource":{"type":"table","name":"default.druid_table_alltypesorc"},"intervals":{"type":"LegacySegmentSpec","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"]},"virtualColumns":[],"resultFormat":"compactedList","batchSize":20480,"limit":9223372036854775807,"order":"none","filter":{"type":"and","fields":[{"type":"and","fields":[{"type":"bound","dimension":"cfloatstring","lower":"DynamicValue(RS_4_alltypesorc_small_ [...]
druid.query.type scan
Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE
Filter Operator