You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/05/23 18:11:55 UTC
[incubator-druid] branch master updated: Adding influxdb emitter as
a contrib extension (#7717)
This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6964ac2 Adding influxdb emitter as a contrib extension (#7717)
6964ac2 is described below
commit 6964ac23a2a3b95f29a2d4aa443ef9fcbc84dfdb
Author: awelsh93 <32...@users.noreply.github.com>
AuthorDate: Thu May 23 19:11:48 2019 +0100
Adding influxdb emitter as a contrib extension (#7717)
* Adding influxdb emitter as a contrib extension
* addressing code review comments
---
distribution/pom.xml | 2 +
.../extensions-contrib/influxdb-emitter.md | 75 ++++++++
docs/content/development/extensions.md | 1 +
extensions-contrib/influxdb-emitter/pom.xml | 74 +++++++
.../druid/emitter/influxdb/InfluxdbEmitter.java | 214 +++++++++++++++++++++
.../emitter/influxdb/InfluxdbEmitterConfig.java | 196 +++++++++++++++++++
.../emitter/influxdb/InfluxdbEmitterModule.java | 61 ++++++
.../org.apache.druid.initialization.DruidModule | 16 ++
.../influxdb/InfluxdbEmitterConfigTest.java | 212 ++++++++++++++++++++
.../emitter/influxdb/InfluxdbEmitterTest.java | 208 ++++++++++++++++++++
pom.xml | 1 +
11 files changed, 1060 insertions(+)
diff --git a/distribution/pom.xml b/distribution/pom.xml
index e25b3c5..4ec9b26 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -324,6 +324,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-influx-extensions</argument>
<argument>-c</argument>
+ <argument>org.apache.druid.extensions.contrib:druid-influxdb-emitter</argument>
+ <argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-kafka-eight-simple-consumer</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:kafka-emitter</argument>
diff --git a/docs/content/development/extensions-contrib/influxdb-emitter.md b/docs/content/development/extensions-contrib/influxdb-emitter.md
new file mode 100644
index 0000000..138a0bb
--- /dev/null
+++ b/docs/content/development/extensions-contrib/influxdb-emitter.md
@@ -0,0 +1,75 @@
+---
+layout: doc_page
+title: "InfluxDB Emitter"
+---
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+# InfluxDB Emitter
+
+To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-influxdb-emitter` extension.
+
+## Introduction
+
+This extension emits druid metrics to [InfluxDB](https://www.influxdata.com/time-series-platform/influxdb/) over HTTP. Currently this emitter only emits service metric events to InfluxDB (See [Druid metrics](../../operations/metrics.html) for a list of metrics).
+When a metric event is fired it is added to a queue of events. After a configurable amount of time, the events on the queue are transformed to InfluxDB's line protocol
+and POSTed to the InfluxDB HTTP API. The entire queue is flushed at this point. The queue is also flushed as the emitter is shutdown.
+
+Note that authentication and authorization must be [enabled](https://docs.influxdata.com/influxdb/v1.7/administration/authentication_and_authorization/) on the InfluxDB server.
+
+## Configuration
+
+All the configuration parameters for the influxdb emitter are under `druid.emitter.influxdb`.
+
+|Property|Description|Required?|Default|
+|--------|-----------|---------|-------|
+|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
+|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
+|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
+|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.Max_Value(=2^31-1)|
+|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
+|`druid.emitter.influxdb.flushDelay`|How long (in milliseconds) the scheduled method will wait until it first runs.|No|60000|
+|`druid.emitter.influxdb.influxdbUserName`|The username for authenticating with the InfluxDB database.|Yes|N/A|
+|`druid.emitter.influxdb.influxdbPassword`|The password of the database authorized user|Yes|N/A|
+|`druid.emitter.influxdb.dimensionWhitelist`|A whitelist of metric dimensions to include as tags|No|`["dataSource","type","numMetrics","numDimensions","threshold","dimension","taskType","taskStatus","tier"]`|
+
+## InfluxDB Line Protocol
+
+An example of how this emitter parses a Druid metric event into InfluxDB's [line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_reference/) is given here:
+
+The syntax of the line protocol is :
+
+`<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]`
+
+where timestamp is in nano-seconds since epoch.
+
+A typical service metric event as recorded by Druid's logging emitter is: `Event [{"feed":"metrics","timestamp":"2017-10-31T09:09:06.857Z","service":"druid/historical","host":"historical001:8083","version":"0.11.0-SNAPSHOT","metric":"query/cache/total/hits","value":34787256}]`.
+
+This event is parsed into line protocol according to these rules:
+
+* The measurement becomes druid_query since query is the first part of the metric.
+* The tags are service=druid/historical, hostname=historical001, metric=druid_cache_total. (The metric tag is the middle part of the druid metric separated with _ and preceded by druid_. Another example would be if an event has metric=query/time then there is no middle part and hence no metric tag)
+* The field is druid_hits since this is the last part of the metric.
+
+This gives the following String which can be POSTed to InfluxDB: `"druid_query,service=druid/historical,hostname=historical001,metric=druid_cache_total druid_hits=34787256 1509440946857000000"`
+
+The InfluxDB emitter has a white list of dimensions
+which will be added as a tag to the line protocol string if the metric has a dimension from the white list.
+The value of the dimension is sanitized such that every occurence of a dot or whitespace is replaced with a `_` .
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 5112ee9..c58044c 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -96,6 +96,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
|druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)|
|druid-moving-average-query|Support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.|[link](../development/extensions-contrib/moving-average-query.html)|
+|druid-influxdb-emitter|InfluxDB metrics emitter|[link](../development/extensions-contrib/influxdb-emitter.html)|
## Promoting Community Extension to Core Extension
diff --git a/extensions-contrib/influxdb-emitter/pom.xml b/extensions-contrib/influxdb-emitter/pom.xml
new file mode 100644
index 0000000..3605fd0
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <groupId>org.apache.druid.extensions.contrib</groupId>
+ <artifactId>druid-influxdb-emitter</artifactId>
+ <name>druid-influxdb-emitter</name>
+ <description>influxdb-emitter</description>
+
+ <parent>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid</artifactId>
+ <version>0.15.0-incubating-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-core</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>pl.pragmatists</groupId>
+ <artifactId>JUnitParams</artifactId>
+ <version>1.0.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
new file mode 100644
index 0000000..93f7bc2
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+
+public class InfluxdbEmitter implements Emitter
+{
+
+ private static final Logger log = new Logger(InfluxdbEmitter.class);
+ private final HttpClient influxdbClient;
+ private final InfluxdbEmitterConfig influxdbEmitterConfig;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+ private final ScheduledExecutorService exec = ScheduledExecutors.fixed(1, "InfluxdbEmitter-%s");
+ private final ImmutableSet dimensionWhiteList;
+ private final LinkedBlockingQueue<ServiceMetricEvent> eventsQueue;
+ private static final Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+");
+
+ public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
+ {
+ this.influxdbEmitterConfig = influxdbEmitterConfig;
+ this.influxdbClient = HttpClientBuilder.create().build();
+ this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
+ this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
+ log.info("constructed influxdb emitter");
+ }
+
+ @Override
+ public void start()
+ {
+ synchronized (started) {
+ if (!started.get()) {
+ exec.scheduleAtFixedRate(
+ () -> transformAndSendToInfluxdb(eventsQueue),
+ influxdbEmitterConfig.getFlushDelay(),
+ influxdbEmitterConfig.getFlushPeriod(),
+ TimeUnit.MILLISECONDS
+ );
+ started.set(true);
+ }
+ }
+ }
+
+ @Override
+ public void emit(Event event)
+ {
+ if (event instanceof ServiceMetricEvent) {
+ ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
+ try {
+ eventsQueue.put(metricEvent);
+ }
+ catch (InterruptedException exception) {
+ log.error(exception, "Failed to add metricEvent to events queue.");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void postToInflux(String payload)
+ {
+ HttpPost post = new HttpPost(
+ "http://" + influxdbEmitterConfig.getHostname()
+ + ":" + influxdbEmitterConfig.getPort()
+ + "/write?db=" + influxdbEmitterConfig.getDatabaseName()
+ + "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
+ + "&p=" + influxdbEmitterConfig.getInfluxdbPassword()
+ );
+
+ post.setEntity(new StringEntity(payload, ContentType.DEFAULT_TEXT));
+ post.setHeader("Content-Type", "application/x-www-form-urlencoded");
+
+ try {
+ influxdbClient.execute(post);
+ }
+ catch (IOException ex) {
+ log.info(ex, "Failed to post events to InfluxDB.");
+ }
+ finally {
+ post.releaseConnection();
+ }
+ }
+
+ public String transformForInfluxSystems(ServiceMetricEvent event)
+ {
+ // split Druid metric on slashes and join middle parts (if any) with "_"
+ String[] parts = getValue("metric", event).split("/");
+ String metric = String.join(
+ "_",
+ Arrays.asList(
+ Arrays.copyOfRange(
+ parts,
+ 1,
+ parts.length - 1
+ )
+ )
+ );
+
+ // measurement
+ StringBuilder payload = new StringBuilder("druid_");
+ payload.append(parts[0]);
+
+ // tags
+ StringBuilder tag = new StringBuilder(",service=");
+ tag.append(getValue("service", event));
+ String metricTag = parts.length == 2 ? "" : ",metric=druid_" + metric;
+ tag.append(metricTag);
+ tag.append(StringUtils.format(",hostname=%s", getValue("host", event).split(":")[0]));
+ ImmutableSet<String> dimNames = ImmutableSet.copyOf(event.getUserDims().keySet());
+ for (String dimName : dimNames) {
+ if (this.dimensionWhiteList.contains(dimName)) {
+ tag.append(StringUtils.format(",%1$s=%2$s", dimName, sanitize(String.valueOf(event.getUserDims().get(dimName)))));
+ }
+ }
+ payload.append(tag);
+
+ // fields
+ payload.append(StringUtils.format(" druid_%1$s=%2$s", parts[parts.length - 1], getValue("value", event)));
+
+ // timestamp
+ payload.append(StringUtils.format(" %d\n", event.getCreatedTime().getMillis() * 1000000));
+
+ return payload.toString();
+ }
+
+ private static String sanitize(String namespace)
+ {
+ return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
+ }
+
+ public String getValue(String key, ServiceMetricEvent event)
+ {
+ switch (key) {
+ case "service":
+ return event.getService();
+ case "eventType":
+ return event.getClass().getSimpleName();
+ case "metric":
+ return event.getMetric();
+ case "feed":
+ return event.getFeed();
+ case "host":
+ return event.getHost();
+ case "value":
+ return event.getValue().toString();
+ default:
+ return key;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ if (started.get()) {
+ transformAndSendToInfluxdb(eventsQueue);
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ flush();
+ log.info("Closing [%s]", this.getClass().getName());
+ started.set(false);
+ exec.shutdownNow();
+ }
+
+ public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> eventsQueue)
+ {
+ StringBuilder payload = new StringBuilder();
+ int initialQueueSize = eventsQueue.size();
+ for (int i = 0; i < initialQueueSize; i++) {
+ payload.append(transformForInfluxSystems(eventsQueue.poll()));
+ }
+ postToInflux(payload.toString());
+ }
+
+}
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
new file mode 100644
index 0000000..d96b070
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class InfluxdbEmitterConfig
+{
+
+ private static final int DEFAULT_PORT = 8086;
+ private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
+ private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
+ private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
+
+ @JsonProperty
+ private final String hostname;
+ @JsonProperty
+ private final Integer port;
+ @JsonProperty
+ private final String databaseName;
+ @JsonProperty
+ private final Integer maxQueueSize;
+ @JsonProperty
+ private final Integer flushPeriod;
+ @JsonProperty
+ private final Integer flushDelay;
+ @JsonProperty
+ private final String influxdbUserName;
+ @JsonProperty
+ private final String influxdbPassword;
+ @JsonProperty
+ private final ImmutableSet<String> dimensionWhitelist;
+
+ private static Logger log = new Logger(InfluxdbEmitterConfig.class);
+
+ @JsonCreator
+ public InfluxdbEmitterConfig(
+ @JsonProperty("hostname") String hostname,
+ @JsonProperty("port") Integer port,
+ @JsonProperty("databaseName") String databaseName,
+ @JsonProperty("maxQueueSize") Integer maxQueueSize,
+ @JsonProperty("flushPeriod") Integer flushPeriod,
+ @JsonProperty("flushDelay") Integer flushDelay,
+ @JsonProperty("influxdbUserName") String influxdbUserName,
+ @JsonProperty("influxdbPassword") String influxdbPassword,
+ @JsonProperty("dimensionWhitelist") Set<String> dimensionWhitelist
+ )
+ {
+ this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
+ this.port = port == null ? DEFAULT_PORT : port;
+ this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
+ this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
+ this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
+ this.flushDelay = flushDelay == null ? DEFAULT_FLUSH_PERIOD : flushDelay;
+ this.influxdbUserName = Preconditions.checkNotNull(influxdbUserName, "influxdbUserName can not be null");
+ this.influxdbPassword = Preconditions.checkNotNull(influxdbPassword, "influxdbPassword can not be null");
+ this.dimensionWhitelist = dimensionWhitelist == null ? ImmutableSet.copyOf(DEFAULT_DIMENSION_WHITELIST) : ImmutableSet.copyOf(dimensionWhitelist);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof InfluxdbEmitterConfig)) {
+ return false;
+ }
+
+ InfluxdbEmitterConfig that = (InfluxdbEmitterConfig) o;
+
+ if (getPort() != that.getPort()) {
+ return false;
+ }
+ if (!getHostname().equals(that.getHostname())) {
+ return false;
+ }
+ if (!getDatabaseName().equals(that.getDatabaseName())) {
+ return false;
+ }
+ if (getFlushPeriod() != that.getFlushPeriod()) {
+ return false;
+ }
+ if (getMaxQueueSize() != that.getMaxQueueSize()) {
+ return false;
+ }
+ if (getFlushDelay() != that.getFlushDelay()) {
+ return false;
+ }
+ if (!getInfluxdbUserName().equals(that.getInfluxdbUserName())) {
+ return false;
+ }
+ if (!getInfluxdbPassword().equals(that.getInfluxdbPassword())) {
+ return false;
+ }
+ if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
+ return false;
+ }
+ return true;
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = getHostname().hashCode();
+ result = 31 * result + getPort();
+ result = 31 * result + getDatabaseName().hashCode();
+ result = 31 * result + getFlushPeriod();
+ result = 31 * result + getMaxQueueSize();
+ result = 31 * result + getFlushDelay();
+ result = 31 * result + getInfluxdbUserName().hashCode();
+ result = 31 * result + getInfluxdbPassword().hashCode();
+ result = 31 * result + getDimensionWhitelist().hashCode();
+ return result;
+ }
+
+ @JsonProperty
+ public String getHostname()
+ {
+ return hostname;
+ }
+
+ @JsonProperty
+ public int getPort()
+ {
+ return port;
+ }
+
+ @JsonProperty
+ public String getDatabaseName()
+ {
+ return databaseName;
+ }
+
+ @JsonProperty
+ public int getFlushPeriod()
+ {
+ return flushPeriod;
+ }
+
+ @JsonProperty
+ public int getMaxQueueSize()
+ {
+ return maxQueueSize;
+ }
+
+ @JsonProperty
+ public int getFlushDelay()
+ {
+ return flushDelay;
+ }
+
+ @JsonProperty
+ public String getInfluxdbUserName()
+ {
+ return influxdbUserName;
+ }
+
+ @JsonProperty
+ public String getInfluxdbPassword()
+ {
+ return influxdbPassword;
+ }
+
+ @JsonProperty
+ public ImmutableSet<String> getDimensionWhitelist()
+ {
+ return dimensionWhitelist;
+ }
+}
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java
new file mode 100644
index 0000000..f6e6fa4
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.core.Emitter;
+
+import java.util.Collections;
+import java.util.List;
+
+public class InfluxdbEmitterModule implements DruidModule
+{
+
+ private static final String EMITTER_TYPE = "influxdb";
+ private static final Logger log = new Logger(InfluxdbEmitterModule.class);
+
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, InfluxdbEmitterConfig.class);
+ }
+
+ @Provides
+ @ManageLifecycle
+ @Named(EMITTER_TYPE)
+ public Emitter getEmitter(InfluxdbEmitterConfig influxdbEmitterConfig, ObjectMapper mapper)
+ {
+ return new InfluxdbEmitter(influxdbEmitterConfig);
+ }
+}
diff --git a/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 0000000..fafe8ee
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.emitter.influxdb.InfluxdbEmitterModule
diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
new file mode 100644
index 0000000..aa0d061
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class InfluxdbEmitterConfigTest
+{
+ private ObjectMapper mapper = new DefaultObjectMapper();
+ private InfluxdbEmitterConfig influxdbEmitterConfig;
+
+ @Before
+ public void setUp()
+ {
+ mapper.setInjectableValues(new InjectableValues.Std().addValue(
+ ObjectMapper.class,
+ new DefaultObjectMapper()
+ ));
+
+ influxdbEmitterConfig = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ }
+
+ @Test
+ public void testInfluxdbEmitterConfigObjectsAreDifferent() throws IOException
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
+ "localhost",
+ 8080,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ Assert.assertNotEquals(influxdbEmitterConfig, influxdbEmitterConfigComparison);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConfigWithNullHostname() throws IOException
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
+ null,
+ 8080,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ }
+
+ @Test
+ public void testConfigWithNullPort() throws IOException
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig(
+ "localhost",
+ null,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ int expectedPort = 8086;
+ Assert.assertEquals(expectedPort, influxdbEmitterConfig.getPort());
+ }
+
+ @Test
+ public void testEqualsMethod()
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ Assert.assertTrue(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison));
+ }
+
+ @Test
+ public void testEqualsMethodWithNotEqualConfigs()
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 10000,
+ "adam",
+ "password",
+ null
+ );
+ Assert.assertFalse(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConfigWithNullInfluxdbUserName() throws IOException
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ null,
+ "password",
+ null
+ );
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testConfigWithNullInfluxdbPassword() throws IOException
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void testConfigWithNullDimensionWhitelist()
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier"));
+ Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
+ }
+
+ @Test
+ public void testConfigWithDimensionWhitelist()
+ {
+ InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ ImmutableSet.of("dataSource", "taskType")
+ );
+ ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "taskType"));
+ Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
+ }
+
+}
diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
new file mode 100644
index 0000000..2095a2f
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InfluxdbEmitterTest
+{
+
+ private ServiceMetricEvent event;
+
+ @Before
+ public void setUp()
+ {
+ DateTime date = new DateTime(2017,
+ 10,
+ 30,
+ 10,
+ 00,
+ DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+ String metric = "metric/te/st/value";
+ Number value = 1234;
+ ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+ "service",
+ "druid/historical",
+ "host",
+ "localhost",
+ "version",
+ "0.10.0"
+ );
+ ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ builder.setDimension("nonWhiteListedDim", "test");
+ builder.setDimension("dataSource", "test_datasource");
+ ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+ event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+ }
+
+ @Test
+ public void testTransformForInfluxWithLongMetric()
+ {
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ String expected =
+ "druid_metric,service=druid/historical,metric=druid_te_st,hostname=localhost,dataSource=test_datasource druid_value=1234 1509357600000000000"
+ + "\n";
+ String actual = influxdbEmitter.transformForInfluxSystems(event);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testTransformForInfluxWithShortMetric()
+ {
+ DateTime date = new DateTime(2017,
+ 10,
+ 30,
+ 10,
+ 00,
+ DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+ String metric = "metric/time";
+ Number value = 1234;
+ ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+ "service",
+ "druid/historical",
+ "host",
+ "localhost",
+ "version",
+ "0.10.0"
+ );
+ ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+ ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ String expected = "druid_metric,service=druid/historical,hostname=localhost druid_time=1234 1509357600000000000"
+ + "\n";
+ String actual = influxdbEmitter.transformForInfluxSystems(event);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testMetricIsInDimensionWhitelist()
+ {
+ DateTime date = new DateTime(2017,
+ 10,
+ 30,
+ 10,
+ 00,
+ DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+ String metric = "metric/time";
+ Number value = 1234;
+ ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+ "service",
+ "druid/historical",
+ "host",
+ "localhost",
+ "version",
+ "0.10.0"
+ );
+ ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+ builder.setDimension("dataSource", "wikipedia");
+ builder.setDimension("taskType", "index");
+ ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ ImmutableSet.of("dataSource")
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia druid_time=1234 1509357600000000000"
+ + "\n";
+ String actual = influxdbEmitter.transformForInfluxSystems(event);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testMetricIsInDefaultDimensionWhitelist()
+ {
+ DateTime date = new DateTime(2017,
+ 10,
+ 30,
+ 10,
+ 00,
+ DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+ String metric = "metric/time";
+ Number value = 1234;
+ ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+ "service",
+ "druid/historical",
+ "host",
+ "localhost",
+ "version",
+ "0.10.0"
+ );
+ ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+ builder.setDimension("dataSource", "wikipedia");
+ builder.setDimension("taskType", "index");
+ ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+ InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+ "localhost",
+ 8086,
+ "dbname",
+ 10000,
+ 15000,
+ 30000,
+ "adam",
+ "password",
+ null
+ );
+ InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+ String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia,taskType=index druid_time=1234 1509357600000000000"
+ + "\n";
+ String actual = influxdbEmitter.transformForInfluxSystems(event);
+ Assert.assertEquals(expected, actual);
+ }
+}
diff --git a/pom.xml b/pom.xml
index df24ca8..0c5c5ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,6 +175,7 @@
<module>extensions-contrib/momentsketch</module>
<module>extensions-contrib/moving-average-query</module>
<module>extensions-contrib/tdigestsketch</module>
+ <module>extensions-contrib/influxdb-emitter</module>
<!-- distribution packaging -->
<module>distribution</module>
</modules>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org