You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/04/03 16:39:40 UTC

[nifi] branch master updated: NIFI-6180: exposing firehose grace period to DruidTranquilityController

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a63de2  NIFI-6180: exposing firehose grace period to DruidTranquilityController
3a63de2 is described below

commit 3a63de2ae2debea95244bc5c424d8669d5833cd9
Author: Endre Zoltan Kovacs <ek...@hortonworks.com>
AuthorDate: Wed Apr 3 16:35:04 2019 +0200

    NIFI-6180: exposing firehose grace period to DruidTranquilityController
    
    this allows for configuring `druidBeam.firehoseGracePeriod`
    (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md#properties)
    
    NIFI-6180: Corrected typo in DruidTranquilityController
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3403
---
 .../controller/druid/DruidTranquilityController.java   | 18 ++++++++++++++++--
 .../druid/MockDruidTranquilityController.java          |  2 +-
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 0690264..78aab4c 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -80,6 +80,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
     private final static String FIREHOSE_PATTERN = "druid:firehose:%s";
 
     private final static AllowableValue PT1M = new AllowableValue("PT1M", "1 minute", "1 minute");
+    private final static AllowableValue PT5M = new AllowableValue("PT5M", "5 minutes", "5 minutes");
     private final static AllowableValue PT10M = new AllowableValue("PT10M", "10 minutes", "10 minutes");
     private final static AllowableValue PT60M = new AllowableValue("PT60M", "60 minutes", "1 hour");
 
@@ -276,6 +277,16 @@ public class DruidTranquilityController extends AbstractControllerService implem
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor FIREHOSE_GRACE_PERIOD = new PropertyDescriptor.Builder()
+            .name("druid-cs-firehose-grace-period")
+            .displayName("Firehose Grace Period")
+            .description("An additional grace period, after the \"Late Event Grace Period\" (window period) has elapsed, but before the indexing task is shut down.")
+            .required(true)
+            .allowableValues(PT1M, PT5M, PT10M, PT60M)
+            .defaultValue(PT5M.getValue())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
             .name("druid-cs-batch-size")
             .displayName("Batch Size")
@@ -332,6 +343,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
         props.add(QUERY_GRANULARITY);
         props.add(INDEX_RETRY_PERIOD);
         props.add(WINDOW_PERIOD);
+        props.add(FIREHOSE_GRACE_PERIOD);
         props.add(TIMESTAMP_FIELD);
         props.add(MAX_BATCH_SIZE);
         props.add(MAX_PENDING_BATCHES);
@@ -378,6 +390,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
         final String segmentGranularity = context.getProperty(SEGMENT_GRANULARITY).getValue();
         final String queryGranularity = context.getProperty(QUERY_GRANULARITY).getValue();
         final String windowPeriod = context.getProperty(WINDOW_PERIOD).getValue();
+        final String firehoseGracePeriod = context.getProperty(FIREHOSE_GRACE_PERIOD).getValue();
         final String indexRetryPeriod = context.getProperty(INDEX_RETRY_PERIOD).getValue();
         final String aggregatorJSON = context.getProperty(AGGREGATOR_JSON).evaluateAttributeExpressions().getValue();
         final String dimensionsStringList = context.getProperty(DIMENSIONS_LIST).evaluateAttributeExpressions().getValue();
@@ -416,7 +429,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
         final TimestampSpec timestampSpec = new TimestampSpec(timestampField, "auto", null);
 
         final Beam<Map<String, Object>> beam = buildBeam(dataSource, indexService, discoveryPath, clusterPartitions, clusterReplication,
-                segmentGranularity, queryGranularity, windowPeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
+                segmentGranularity, queryGranularity, windowPeriod, firehoseGracePeriod, indexRetryPeriod, dimensions, aggregator, timestamper, timestampSpec);
 
         tranquilizer = buildTranquilizer(maxBatchSize, maxPendingBatches, lingerMillis, beam);
 
@@ -433,7 +446,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
     }
 
     Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
-                                        String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions,
+                                        String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions,
                                         List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
         return DruidBeams.builder(timestamper)
                 .curator(curator)
@@ -454,6 +467,7 @@ public class DruidTranquilityController extends AbstractControllerService implem
                         DruidBeamConfig
                                 .builder()
                                 .indexRetryPeriod(new Period(indexRetryPeriod))
+                                .firehoseGracePeriod(new Period(firehoseGracePeriod))
                                 .build())
                 .buildBeam();
     }
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
index 695212c..cd77d11 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-processors/src/test/java/org/apache/nifi/controller/druid/MockDruidTranquilityController.java
@@ -137,7 +137,7 @@ public class MockDruidTranquilityController extends DruidTranquilityController {
     @SuppressWarnings("unchecked")
     @Override
     Beam<Map<String, Object>> buildBeam(String dataSource, String indexService, String discoveryPath, int clusterPartitions, int clusterReplication,
-                                        String segmentGranularity, String queryGranularity, String windowPeriod, String indexRetryPeriod, List<String> dimensions,
+                                        String segmentGranularity, String queryGranularity, String windowPeriod, String firehoseGracePeriod, String indexRetryPeriod, List<String> dimensions,
                                         List<AggregatorFactory> aggregator, Timestamper<Map<String, Object>> timestamper, TimestampSpec timestampSpec) {
         return mock(Beam.class);
     }