You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/06/15 13:53:22 UTC

nifi git commit: NIFI-5188: DruidTranquilityController does not fully support Druid aggregator

Repository: nifi
Updated Branches:
  refs/heads/master 52d6b9cfa -> 9b461027a


NIFI-5188: DruidTranquilityController does not fully support Druid aggregator

Rollback Druid 0.9.2 to 0.9.1

Fixed checkstyle error

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2696


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b461027
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b461027
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b461027

Branch: refs/heads/master
Commit: 9b461027a4e9940f7ade40e9169fc926f9289eca
Parents: 52d6b9c
Author: Dongkyu Hwangbo <hw...@gmail.com>
Authored: Mon May 14 16:33:49 2018 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Fri Jun 15 09:52:14 2018 -0400

----------------------------------------------------------------------
 .../nifi-druid-controller-service/pom.xml       | 12 ++-
 .../druid/DruidTranquilityController.java       | 81 ++++++--------------
 nifi-nar-bundles/nifi-druid-bundle/pom.xml      |  2 +-
 3 files changed, 34 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
index 2f52691..139aad8 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
@@ -47,10 +47,20 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>io.druid.extensions</groupId>
+            <artifactId>druid-histogram</artifactId>
+            <version>${druid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.druid.extensions</groupId>
+            <artifactId>druid-datasketches</artifactId>
+            <version>${druid.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.7.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 158579a..5d617f7 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -24,11 +24,17 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import io.druid.granularity.QueryGranularity;
+import io.druid.jackson.AggregatorsModule;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.query.aggregation.datasketches.theta.SketchModule;
+import io.druid.query.aggregation.histogram.ApproximateHistogramDruidModule;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -47,7 +53,6 @@ import org.apache.nifi.controller.api.druid.DruidTranquilityService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.codehaus.jackson.map.ObjectMapper;
 
 import com.metamx.common.Granularity;
 import com.metamx.tranquility.beam.Beam;
@@ -62,15 +67,7 @@ import com.metamx.tranquility.tranquilizer.Tranquilizer;
 import com.metamx.tranquility.typeclass.Timestamper;
 
 import io.druid.data.input.impl.TimestampSpec;
-import io.druid.granularity.QueryGranularity;
 import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.CountAggregatorFactory;
-import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
-import io.druid.query.aggregation.DoubleMinAggregatorFactory;
-import io.druid.query.aggregation.DoubleSumAggregatorFactory;
-import io.druid.query.aggregation.LongMaxAggregatorFactory;
-import io.druid.query.aggregation.LongMinAggregatorFactory;
-import io.druid.query.aggregation.LongSumAggregatorFactory;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 
@@ -518,56 +515,22 @@ public class DruidTranquilityController extends AbstractControllerService implem
     }
 
     private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
-        List<AggregatorFactory> aggregatorList = new LinkedList<>();
-        List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON);
-        for (Map<String, String> aggregator : aggregatorInfo) {
-
-            if (aggregator.get("type").equalsIgnoreCase("count")) {
-                aggregatorList.add(getCountAggregator(aggregator));
-            } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) {
-                aggregatorList.add(getDoubleSumAggregator(aggregator));
-            } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) {
-                aggregatorList.add(getDoubleMaxAggregator(aggregator));
-            } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) {
-                aggregatorList.add(getDoubleMinAggregator(aggregator));
-            } else if (aggregator.get("type").equalsIgnoreCase("longsum")) {
-                aggregatorList.add(getLongSumAggregator(aggregator));
-            } else if (aggregator.get("type").equalsIgnoreCase("longmax")) {
-                aggregatorList.add(getLongMaxAggregator(aggregator));
-            } else if (aggregator.get("type").equalsIgnoreCase("longmin")) {
-                aggregatorList.add(getLongMinAggregator(aggregator));
-            }
-        }
-
-        return aggregatorList;
-    }
-
-    private AggregatorFactory getLongMinAggregator(Map<String, String> map) {
-        return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName"));
-    }
-
-    private AggregatorFactory getLongMaxAggregator(Map<String, String> map) {
-        return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
-    }
-
-    private AggregatorFactory getLongSumAggregator(Map<String, String> map) {
-        return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName"));
-    }
-
-    private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) {
-        return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName"));
-    }
-
-    private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) {
-        return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
-    }
-
-    private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) {
-        return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName"));
-    }
+        ComponentLog log = getLogger();
+        ObjectMapper mapper = new ObjectMapper(null);
+        mapper.registerModule(new AggregatorsModule());
+        mapper.registerModules(Lists.newArrayList(new SketchModule().getJacksonModules()));
+        mapper.registerModules(Lists.newArrayList(new ApproximateHistogramDruidModule().getJacksonModules()));
 
-    private AggregatorFactory getCountAggregator(Map<String, String> map) {
-        return new CountAggregatorFactory(map.get("name"));
+        try {
+            return mapper.readValue(
+                aggregatorJSON,
+                new TypeReference<List<AggregatorFactory>>() {
+                }
+            );
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+            return null;
+        }
     }
 
     private Granularity getGranularity(String granularityString) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
index 6543e7f..3deee9d 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -52,4 +52,4 @@
         <module>nifi-druid-controller-service</module>
         <module>nifi-druid-processors</module>
     </modules>
-</project>
\ No newline at end of file
+</project>