You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/04/03 16:39:40 UTC
[nifi] branch master updated: NIFI-6180: exposing firehose grace
period to DruidTranquilityController
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a63de2 NIFI-6180: exposing firehose grace period to DruidTranquilityController
3a63de2 is described below
commit 3a63de2ae2debea95244bc5c424d8669d5833cd9
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
AuthorDate: Wed Apr 3 16:35:04 2019 +0200
NIFI-6180: exposing firehose grace period to DruidTranquilityController
this allows for configuring `druidBeam.firehoseGracePeriod`
(https://github.com/druid-io/tranquility/blob/master/docs/configuration.md#properties)
NIFI-6180: Corrected typo in DruidTranquilityController
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3403
---
.../controller/druid/DruidTranquilityController.java | 18 ++++++++++++++++--
.../druid/MockDruidTranquilityController.java | 2 +-
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 0690264..78aab4c 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -80,6 +80,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
private final static String FIREHOSE_PATTERN = "druid:firehose:%s";
private final static AllowableValue PT1M = new AllowableValue("PT1M", "1 minute", "1 minute");
+ private final static AllowableValue PT5M = new AllowableValue("PT5M", "5 minutes", "5 minutes");
private final static AllowableValue PT10M = new AllowableValue("PT10M", "10 minutes", "10 minutes");
private final static AllowableValue PT60M = new AllowableValue("PT60M", "60 minutes", "1 hour");
@@ -276,6 +277,16 @@ public class DruidTranquilityController extends AbstractControllerService implem
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor FIREHOSE_GRACE_PERIOD = new PropertyDescriptor.Builder()
+ .name("druid-cs-firehose-grace-period")
+ .displayName("Firehose Grace Period")
+ .description("An additional grace period, after the \"Late Event Grace Period\" (window period) has elapsed, but before the indexing task is shut down.")
+ .required(true)
+ .allowableValues(PT1M, PT5M, PT10M, PT60M)
+ .defaultValue(PT5M.getValue())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("druid-cs-batch-size")
.displayName("Batch Size")
@@ -332,6 +343,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
props.add(QUERY_GRANULARITY);
props.add(INDEX_RETRY_PERIOD);
props.add(WINDOW_PERIOD);
+ props.add(FIREHOSE_GRACE_PERIOD);
props.add(TIMESTAMP_FIELD);
props.add(MAX_BATCH_SIZE);
props.add(MAX_PENDING_BATCHES);
@@ -378,6 +390,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue();
final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue();
final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue();
+ final String firehoseGracePeriod = context.getProperty(FIREHOSE_GRACE_PERIOD).getValue();
final String indexRetryPeriod = context.getProperty(INDEX_RETRY_PERIOD).getValue();
final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).evaluateAttributeExpressions().getValue();
final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).evaluateAttributeExpressions().getValue();
@@ -416,7 +429,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null);
final Beam<Map<String, Object>> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication,
- segmentGranularity, queryGranularity, windowPeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
+ segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam);
@@ -433,7 +446,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
}
Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
- String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions,
+ String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions,
List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
return DruidBeams.builder(timestamper)
.curator(curator)
@@ -454,6 +467,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
DruidBeamConfig
.builder()
.indexRetryPeriod(new Period(indexRetryPeriod))
+ .firehoseGracePeriod(new Period(firehoseGracePeriod))
.build())
.buildBeam();
}
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
index 695212c..cd77d11 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
@@ -137,7 +137,7 @@ public class MockDruidTranquilityController extends DruidTranquilityController {
@SuppressWarnings("unchecked")
@Override
Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
- String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions,
+ String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions,
List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
return mock(Beam.class);
}