You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/06/15 13:53:22 UTC
nifi git commit: NIFI-5188: DruidTranquilityController does not fully
support Druid aggregator
Repository: nifi
Updated Branches:
refs/heads/master 52d6b9cfa -> 9b461027a
NIFI-5188: DruidTranquilityController does not fully support Druid aggregator
Rollback Druid 0.9.2 to 0.9.1
Fixed checkstyle error
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2696
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b461027
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b461027
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b461027
Branch: refs/heads/master
Commit: 9b461027a4e9940f7ade40e9169fc926f9289eca
Parents: 52d6b9c
Author: Dongkyu Hwangbo <hw...@gmail.com>
Authored: Mon May 14 16:33:49 2018 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Fri Jun 15 09:52:14 2018 -0400
----------------------------------------------------------------------
.../nifi-druid-controller-service/pom.xml | 12 ++-
.../druid/DruidTranquilityController.java | 81 ++++++--------------
nifi-nar-bundles/nifi-druid-bundle/pom.xml | 2 +-
3 files changed, 34 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
index 2f52691..139aad8 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/pom.xml
@@ -47,10 +47,20 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>druid-histogram</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.druid.extensions</groupId>
+ <artifactId>druid-datasketches</artifactId>
+ <version>${druid.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.7.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
index 158579a..5d617f7 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
+++ b/nifi-nar-bundles/nifi-druid-bundle/nifi-druid-controller-service/src/main/java/org/apache/nifi/controller/druid/DruidTranquilityController.java
@@ -24,11 +24,17 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import io.druid.granularity.QueryGranularity;
+import io.druid.jackson.AggregatorsModule;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.query.aggregation.datasketches.theta.SketchModule;
+import io.druid.query.aggregation.histogram.ApproximateHistogramDruidModule;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -47,7 +53,6 @@ import org.apache.nifi.controller.api.druid.DruidTranquilityService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
-import org.codehaus.jackson.map.ObjectMapper;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
@@ -62,15 +67,7 @@ import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.data.input.impl.TimestampSpec;
-import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.query.aggregation.CountAggregatorFactory;
-import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
-import io.druid.query.aggregation.DoubleMinAggregatorFactory;
-import io.druid.query.aggregation.DoubleSumAggregatorFactory;
-import io.druid.query.aggregation.LongMaxAggregatorFactory;
-import io.druid.query.aggregation.LongMinAggregatorFactory;
-import io.druid.query.aggregation.LongSumAggregatorFactory;
import org.joda.time.DateTime;
import org.joda.time.Period;
@@ -518,56 +515,22 @@ public class DruidTranquilityController extends AbstractControllerService implem
}
private List<AggregatorFactory> getAggregatorList(String aggregatorJSON) {
- List<AggregatorFactory> aggregatorList = new LinkedList<>();
- List<Map<String, String>> aggregatorInfo = parseJsonString(aggregatorJSON);
- for (Map<String, String> aggregator : aggregatorInfo) {
-
- if (aggregator.get("type").equalsIgnoreCase("count")) {
- aggregatorList.add(getCountAggregator(aggregator));
- } else if (aggregator.get("type").equalsIgnoreCase("doublesum")) {
- aggregatorList.add(getDoubleSumAggregator(aggregator));
- } else if (aggregator.get("type").equalsIgnoreCase("doublemax")) {
- aggregatorList.add(getDoubleMaxAggregator(aggregator));
- } else if (aggregator.get("type").equalsIgnoreCase("doublemin")) {
- aggregatorList.add(getDoubleMinAggregator(aggregator));
- } else if (aggregator.get("type").equalsIgnoreCase("longsum")) {
- aggregatorList.add(getLongSumAggregator(aggregator));
- } else if (aggregator.get("type").equalsIgnoreCase("longmax")) {
- aggregatorList.add(getLongMaxAggregator(aggregator));
- } else if (aggregator.get("type").equalsIgnoreCase("longmin")) {
- aggregatorList.add(getLongMinAggregator(aggregator));
- }
- }
-
- return aggregatorList;
- }
-
- private AggregatorFactory getLongMinAggregator(Map<String, String> map) {
- return new LongMinAggregatorFactory(map.get("name"), map.get("fieldName"));
- }
-
- private AggregatorFactory getLongMaxAggregator(Map<String, String> map) {
- return new LongMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
- }
-
- private AggregatorFactory getLongSumAggregator(Map<String, String> map) {
- return new LongSumAggregatorFactory(map.get("name"), map.get("fieldName"));
- }
-
- private AggregatorFactory getDoubleMinAggregator(Map<String, String> map) {
- return new DoubleMinAggregatorFactory(map.get("name"), map.get("fieldName"));
- }
-
- private AggregatorFactory getDoubleMaxAggregator(Map<String, String> map) {
- return new DoubleMaxAggregatorFactory(map.get("name"), map.get("fieldName"));
- }
-
- private AggregatorFactory getDoubleSumAggregator(Map<String, String> map) {
- return new DoubleSumAggregatorFactory(map.get("name"), map.get("fieldName"));
- }
+ ComponentLog log = getLogger();
+ ObjectMapper mapper = new ObjectMapper(null);
+ mapper.registerModule(new AggregatorsModule());
+ mapper.registerModules(Lists.newArrayList(new SketchModule().getJacksonModules()));
+ mapper.registerModules(Lists.newArrayList(new ApproximateHistogramDruidModule().getJacksonModules()));
- private AggregatorFactory getCountAggregator(Map<String, String> map) {
- return new CountAggregatorFactory(map.get("name"));
+ try {
+ return mapper.readValue(
+ aggregatorJSON,
+ new TypeReference<List<AggregatorFactory>>() {
+ }
+ );
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ return null;
+ }
}
private Granularity getGranularity(String granularityString) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/9b461027/nifi-nar-bundles/nifi-druid-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-druid-bundle/pom.xml b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
index 6543e7f..3deee9d 100644
--- a/nifi-nar-bundles/nifi-druid-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-druid-bundle/pom.xml
@@ -52,4 +52,4 @@
<module>nifi-druid-controller-service</module>
<module>nifi-druid-processors</module>
</modules>
-</project>
\ No newline at end of file
+</project>