You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/05/23 18:11:55 UTC

[incubator-druid] branch master updated: Adding influxdb emitter as a contrib extension (#7717)

This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6964ac2  Adding influxdb emitter as a contrib extension (#7717)
6964ac2 is described below

commit 6964ac23a2a3b95f29a2d4aa443ef9fcbc84dfdb
Author: awelsh93 <32...@users.noreply.github.com>
AuthorDate: Thu May 23 19:11:48 2019 +0100

    Adding influxdb emitter as a contrib extension (#7717)
    
    * Adding influxdb emitter as a contrib extension
    
    * addressing code review comments
---
 distribution/pom.xml                               |   2 +
 .../extensions-contrib/influxdb-emitter.md         |  75 ++++++++
 docs/content/development/extensions.md             |   1 +
 extensions-contrib/influxdb-emitter/pom.xml        |  74 +++++++
 .../druid/emitter/influxdb/InfluxdbEmitter.java    | 214 +++++++++++++++++++++
 .../emitter/influxdb/InfluxdbEmitterConfig.java    | 196 +++++++++++++++++++
 .../emitter/influxdb/InfluxdbEmitterModule.java    |  61 ++++++
 .../org.apache.druid.initialization.DruidModule    |  16 ++
 .../influxdb/InfluxdbEmitterConfigTest.java        | 212 ++++++++++++++++++++
 .../emitter/influxdb/InfluxdbEmitterTest.java      | 208 ++++++++++++++++++++
 pom.xml                                            |   1 +
 11 files changed, 1060 insertions(+)

diff --git a/distribution/pom.xml b/distribution/pom.xml
index e25b3c5..4ec9b26 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -324,6 +324,8 @@
                                         <argument>-c</argument>
                                         <argument>org.apache.druid.extensions.contrib:druid-influx-extensions</argument>
                                         <argument>-c</argument>
+                                        <argument>org.apache.druid.extensions.contrib:druid-influxdb-emitter</argument>
+                                        <argument>-c</argument>
                                         <argument>org.apache.druid.extensions.contrib:druid-kafka-eight-simple-consumer</argument>
                                         <argument>-c</argument>
                                         <argument>org.apache.druid.extensions.contrib:kafka-emitter</argument>
diff --git a/docs/content/development/extensions-contrib/influxdb-emitter.md b/docs/content/development/extensions-contrib/influxdb-emitter.md
new file mode 100644
index 0000000..138a0bb
--- /dev/null
+++ b/docs/content/development/extensions-contrib/influxdb-emitter.md
@@ -0,0 +1,75 @@
+---
+layout: doc_page
+title: "InfluxDB Emitter"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# InfluxDB Emitter
+
+To use this Apache Druid (incubating) extension, make sure to [include](../../operations/including-extensions.html) `druid-influxdb-emitter` extension.
+
+## Introduction
+
+This extension emits druid metrics to [InfluxDB](https://www.influxdata.com/time-series-platform/influxdb/) over HTTP. Currently this emitter only emits service metric events to InfluxDB (See [Druid metrics](../../operations/metrics.html) for a list of metrics).
+When a metric event is fired it is added to a queue of events. After a configurable amount of time, the events on the queue are transformed to InfluxDB's line protocol 
+and POSTed to the InfluxDB HTTP API. The entire queue is flushed at this point. The queue is also flushed as the emitter is shutdown.
+
+Note that authentication and authorization must be [enabled](https://docs.influxdata.com/influxdb/v1.7/administration/authentication_and_authorization/) on the InfluxDB server.
+
+## Configuration
+
+All the configuration parameters for the influxdb emitter are under `druid.emitter.influxdb`.
+
+|Property|Description|Required?|Default|
+|--------|-----------|---------|-------|
+|`druid.emitter.influxdb.hostname`|The hostname of the InfluxDB server.|Yes|N/A|
+|`druid.emitter.influxdb.port`|The port of the InfluxDB server.|No|8086|
+|`druid.emitter.influxdb.databaseName`|The name of the database in InfluxDB.|Yes|N/A|
+|`druid.emitter.influxdb.maxQueueSize`|The size of the queue that holds events.|No|Integer.Max_Value(=2^31-1)|
+|`druid.emitter.influxdb.flushPeriod`|How often (in milliseconds) the events queue is parsed into Line Protocol and POSTed to InfluxDB.|No|60000|
+|`druid.emitter.influxdb.flushDelay`|How long (in milliseconds) the scheduled method will wait until it first runs.|No|60000|
+|`druid.emitter.influxdb.influxdbUserName`|The username for authenticating with the InfluxDB database.|Yes|N/A|
+|`druid.emitter.influxdb.influxdbPassword`|The password of the database authorized user|Yes|N/A|
+|`druid.emitter.influxdb.dimensionWhitelist`|A whitelist of metric dimensions to include as tags|No|`["dataSource","type","numMetrics","numDimensions","threshold","dimension","taskType","taskStatus","tier"]`|
+
+## InfluxDB Line Protocol
+
+An example of how this emitter parses a Druid metric event into InfluxDB's [line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_reference/) is given here: 
+
+The syntax of the line protocol is :  
+
+`<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]`
+ 
+where timestamp is in nano-seconds since epoch.
+
+A typical service metric event as recorded by Druid's logging emitter is: `Event [{"feed":"metrics","timestamp":"2017-10-31T09:09:06.857Z","service":"druid/historical","host":"historical001:8083","version":"0.11.0-SNAPSHOT","metric":"query/cache/total/hits","value":34787256}]`.
+
+This event is parsed into line protocol according to these rules:
+
+* The measurement becomes druid_query since query is the first part of the metric. 
+* The tags are service=druid/historical, hostname=historical001, metric=druid_cache_total. (The metric tag is the middle part of the druid metric separated with _ and preceded by druid_. Another example would be if an event has metric=query/time then there is no middle part and hence no metric tag)
+* The field is druid_hits since this is the last part of the metric.
+
+This gives the following String which can be POSTed to InfluxDB: `"druid_query,service=druid/historical,hostname=historical001,metric=druid_cache_total druid_hits=34787256 1509440946857000000"`
+
+The InfluxDB emitter has a white list of dimensions
+which will be added as a tag to the line protocol string if the metric has a dimension from the white list.
+The value of the dimension is sanitized such that every occurence of a dot or whitespace is replaced with a `_` .
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 5112ee9..c58044c 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -96,6 +96,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
 |druid-thrift-extensions|Support thrift ingestion |[link](../development/extensions-contrib/thrift.html)|
 |druid-opentsdb-emitter|OpenTSDB metrics emitter |[link](../development/extensions-contrib/opentsdb-emitter.html)|
 |druid-moving-average-query|Support for [Moving Average](https://en.wikipedia.org/wiki/Moving_average) and other Aggregate [Window Functions](https://en.wikibooks.org/wiki/Structured_Query_Language/Window_functions) in Druid queries.|[link](../development/extensions-contrib/moving-average-query.html)|
+|druid-influxdb-emitter|InfluxDB metrics emitter|[link](../development/extensions-contrib/influxdb-emitter.html)|
 
 ## Promoting Community Extension to Core Extension
 
diff --git a/extensions-contrib/influxdb-emitter/pom.xml b/extensions-contrib/influxdb-emitter/pom.xml
new file mode 100644
index 0000000..3605fd0
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <groupId>org.apache.druid.extensions.contrib</groupId>
+    <artifactId>druid-influxdb-emitter</artifactId>
+    <name>druid-influxdb-emitter</name>
+    <description>influxdb-emitter</description>
+
+    <parent>
+        <groupId>org.apache.druid</groupId>
+        <artifactId>druid</artifactId>
+        <version>0.15.0-incubating-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>pl.pragmatists</groupId>
+            <artifactId>JUnitParams</artifactId>
+            <version>1.0.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.3.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
new file mode 100644
index 0000000..93f7bc2
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitter.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.core.Emitter;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+
+
+public class InfluxdbEmitter implements Emitter
+{
+
+  private static final Logger log = new Logger(InfluxdbEmitter.class);
+  private final HttpClient influxdbClient;
+  private final InfluxdbEmitterConfig influxdbEmitterConfig;
+  private final AtomicBoolean started = new AtomicBoolean(false);
+  private final ScheduledExecutorService exec = ScheduledExecutors.fixed(1, "InfluxdbEmitter-%s");
+  private final ImmutableSet dimensionWhiteList;
+  private final LinkedBlockingQueue<ServiceMetricEvent> eventsQueue;
+  private static final Pattern DOT_OR_WHITESPACE = Pattern.compile("[\\s]+|[.]+");
+
+  public InfluxdbEmitter(InfluxdbEmitterConfig influxdbEmitterConfig)
+  {
+    this.influxdbEmitterConfig = influxdbEmitterConfig;
+    this.influxdbClient = HttpClientBuilder.create().build();
+    this.eventsQueue = new LinkedBlockingQueue<>(influxdbEmitterConfig.getMaxQueueSize());
+    this.dimensionWhiteList = influxdbEmitterConfig.getDimensionWhitelist();
+    log.info("constructed influxdb emitter");
+  }
+
+  @Override
+  public void start()
+  {
+    synchronized (started) {
+      if (!started.get()) {
+        exec.scheduleAtFixedRate(
+            () -> transformAndSendToInfluxdb(eventsQueue),
+            influxdbEmitterConfig.getFlushDelay(),
+            influxdbEmitterConfig.getFlushPeriod(),
+            TimeUnit.MILLISECONDS
+        );
+        started.set(true);
+      }
+    }
+  }
+
+  @Override
+  public void emit(Event event)
+  {
+    if (event instanceof ServiceMetricEvent) {
+      ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
+      try {
+        eventsQueue.put(metricEvent);
+      }
+      catch (InterruptedException exception) {
+        log.error(exception, "Failed to add metricEvent to events queue.");
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  public void postToInflux(String payload)
+  {
+    HttpPost post = new HttpPost(
+        "http://" + influxdbEmitterConfig.getHostname()
+        + ":" + influxdbEmitterConfig.getPort()
+        + "/write?db=" + influxdbEmitterConfig.getDatabaseName()
+        + "&u=" + influxdbEmitterConfig.getInfluxdbUserName()
+        + "&p=" + influxdbEmitterConfig.getInfluxdbPassword()
+    );
+
+    post.setEntity(new StringEntity(payload, ContentType.DEFAULT_TEXT));
+    post.setHeader("Content-Type", "application/x-www-form-urlencoded");
+
+    try {
+      influxdbClient.execute(post);
+    }
+    catch (IOException ex) {
+      log.info(ex, "Failed to post events to InfluxDB.");
+    }
+    finally {
+      post.releaseConnection();
+    }
+  }
+
+  public String transformForInfluxSystems(ServiceMetricEvent event)
+  {
+    // split Druid metric on slashes and join middle parts (if any) with "_"
+    String[] parts = getValue("metric", event).split("/");
+    String metric = String.join(
+        "_",
+        Arrays.asList(
+            Arrays.copyOfRange(
+                parts,
+                1,
+                parts.length - 1
+            )
+        )
+    );
+
+    // measurement
+    StringBuilder payload = new StringBuilder("druid_");
+    payload.append(parts[0]);
+
+    // tags
+    StringBuilder tag = new StringBuilder(",service=");
+    tag.append(getValue("service", event));
+    String metricTag = parts.length == 2 ? "" : ",metric=druid_" + metric;
+    tag.append(metricTag);
+    tag.append(StringUtils.format(",hostname=%s", getValue("host", event).split(":")[0]));
+    ImmutableSet<String> dimNames = ImmutableSet.copyOf(event.getUserDims().keySet());
+    for (String dimName : dimNames) {
+      if (this.dimensionWhiteList.contains(dimName)) {
+        tag.append(StringUtils.format(",%1$s=%2$s", dimName, sanitize(String.valueOf(event.getUserDims().get(dimName)))));
+      }
+    }
+    payload.append(tag);
+
+    // fields
+    payload.append(StringUtils.format(" druid_%1$s=%2$s", parts[parts.length - 1], getValue("value", event)));
+
+    // timestamp
+    payload.append(StringUtils.format(" %d\n", event.getCreatedTime().getMillis() * 1000000));
+
+    return payload.toString();
+  }
+
+  private static String sanitize(String namespace)
+  {
+    return DOT_OR_WHITESPACE.matcher(namespace).replaceAll("_");
+  }
+
+  public String getValue(String key, ServiceMetricEvent event)
+  {
+    switch (key) {
+      case "service":
+        return event.getService();
+      case "eventType":
+        return event.getClass().getSimpleName();
+      case "metric":
+        return event.getMetric();
+      case "feed":
+        return event.getFeed();
+      case "host":
+        return event.getHost();
+      case "value":
+        return event.getValue().toString();
+      default:
+        return key;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException
+  {
+    if (started.get()) {
+      transformAndSendToInfluxdb(eventsQueue);
+    }
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    flush();
+    log.info("Closing [%s]", this.getClass().getName());
+    started.set(false);
+    exec.shutdownNow();
+  }
+
+  public void transformAndSendToInfluxdb(LinkedBlockingQueue<ServiceMetricEvent> eventsQueue)
+  {
+    StringBuilder payload = new StringBuilder();
+    int initialQueueSize = eventsQueue.size();
+    for (int i = 0; i < initialQueueSize; i++) {
+      payload.append(transformForInfluxSystems(eventsQueue.poll()));
+    }
+    postToInflux(payload.toString());
+  }
+
+}
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
new file mode 100644
index 0000000..d96b070
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfig.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class InfluxdbEmitterConfig
+{
+
+  private static final int DEFAULT_PORT = 8086;
+  private static final int DEFAULT_QUEUE_SIZE = Integer.MAX_VALUE;
+  private static final int DEFAULT_FLUSH_PERIOD = 60000; // milliseconds
+  private static final List<String> DEFAULT_DIMENSION_WHITELIST = Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier");
+
+  @JsonProperty
+  private final String hostname;
+  @JsonProperty
+  private final Integer port;
+  @JsonProperty
+  private final String databaseName;
+  @JsonProperty
+  private final Integer maxQueueSize;
+  @JsonProperty
+  private final Integer flushPeriod;
+  @JsonProperty
+  private final Integer flushDelay;
+  @JsonProperty
+  private final String influxdbUserName;
+  @JsonProperty
+  private final String influxdbPassword;
+  @JsonProperty
+  private final ImmutableSet<String> dimensionWhitelist;
+
+  private static Logger log = new Logger(InfluxdbEmitterConfig.class);
+
+  @JsonCreator
+  public InfluxdbEmitterConfig(
+      @JsonProperty("hostname") String hostname,
+      @JsonProperty("port") Integer port,
+      @JsonProperty("databaseName") String databaseName,
+      @JsonProperty("maxQueueSize") Integer maxQueueSize,
+      @JsonProperty("flushPeriod") Integer flushPeriod,
+      @JsonProperty("flushDelay") Integer flushDelay,
+      @JsonProperty("influxdbUserName") String influxdbUserName,
+      @JsonProperty("influxdbPassword") String influxdbPassword,
+      @JsonProperty("dimensionWhitelist") Set<String> dimensionWhitelist
+  )
+  {
+    this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
+    this.port = port == null ? DEFAULT_PORT : port;
+    this.databaseName = Preconditions.checkNotNull(databaseName, "databaseName can not be null");
+    this.maxQueueSize = maxQueueSize == null ? DEFAULT_QUEUE_SIZE : maxQueueSize;
+    this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
+    this.flushDelay = flushDelay == null ? DEFAULT_FLUSH_PERIOD : flushDelay;
+    this.influxdbUserName = Preconditions.checkNotNull(influxdbUserName, "influxdbUserName can not be null");
+    this.influxdbPassword = Preconditions.checkNotNull(influxdbPassword, "influxdbPassword can not be null");
+    this.dimensionWhitelist = dimensionWhitelist == null ? ImmutableSet.copyOf(DEFAULT_DIMENSION_WHITELIST) : ImmutableSet.copyOf(dimensionWhitelist);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof InfluxdbEmitterConfig)) {
+      return false;
+    }
+
+    InfluxdbEmitterConfig that = (InfluxdbEmitterConfig) o;
+
+    if (getPort() != that.getPort()) {
+      return false;
+    }
+    if (!getHostname().equals(that.getHostname())) {
+      return false;
+    }
+    if (!getDatabaseName().equals(that.getDatabaseName())) {
+      return false;
+    }
+    if (getFlushPeriod() != that.getFlushPeriod()) {
+      return false;
+    }
+    if (getMaxQueueSize() != that.getMaxQueueSize()) {
+      return false;
+    }
+    if (getFlushDelay() != that.getFlushDelay()) {
+      return false;
+    }
+    if (!getInfluxdbUserName().equals(that.getInfluxdbUserName())) {
+      return false;
+    }
+    if (!getInfluxdbPassword().equals(that.getInfluxdbPassword())) {
+      return false;
+    }
+    if (!getDimensionWhitelist().equals(that.getDimensionWhitelist())) {
+      return false;
+    }
+    return true;
+
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = getHostname().hashCode();
+    result = 31 * result + getPort();
+    result = 31 * result + getDatabaseName().hashCode();
+    result = 31 * result + getFlushPeriod();
+    result = 31 * result + getMaxQueueSize();
+    result = 31 * result + getFlushDelay();
+    result = 31 * result + getInfluxdbUserName().hashCode();
+    result = 31 * result + getInfluxdbPassword().hashCode();
+    result = 31 * result + getDimensionWhitelist().hashCode();
+    return result;
+  }
+
+  @JsonProperty
+  public String getHostname()
+  {
+    return hostname;
+  }
+
+  @JsonProperty
+  public int getPort()
+  {
+    return port;
+  }
+
+  @JsonProperty
+  public String getDatabaseName()
+  {
+    return databaseName;
+  }
+
+  @JsonProperty
+  public int getFlushPeriod()
+  {
+    return flushPeriod;
+  }
+
+  @JsonProperty
+  public int getMaxQueueSize()
+  {
+    return maxQueueSize;
+  }
+
+  @JsonProperty
+  public int getFlushDelay()
+  {
+    return flushDelay;
+  }
+
+  @JsonProperty
+  public String getInfluxdbUserName()
+  {
+    return influxdbUserName;
+  }
+
+  @JsonProperty
+  public String getInfluxdbPassword()
+  {
+    return influxdbPassword;
+  }
+
+  @JsonProperty
+  public ImmutableSet<String> getDimensionWhitelist()
+  {
+    return dimensionWhitelist;
+  }
+}
diff --git a/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java
new file mode 100644
index 0000000..f6e6fa4
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterModule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.name.Named;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.core.Emitter;
+
+import java.util.Collections;
+import java.util.List;
+
+public class InfluxdbEmitterModule implements DruidModule
+{
+
+  private static final String EMITTER_TYPE = "influxdb";
+  private static final Logger log = new Logger(InfluxdbEmitterModule.class);
+
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    JsonConfigProvider.bind(binder, "druid.emitter." + EMITTER_TYPE, InfluxdbEmitterConfig.class);
+  }
+
+  @Provides
+  @ManageLifecycle
+  @Named(EMITTER_TYPE)
+  public Emitter getEmitter(InfluxdbEmitterConfig influxdbEmitterConfig, ObjectMapper mapper)
+  {
+    return new InfluxdbEmitter(influxdbEmitterConfig);
+  }
+}
diff --git a/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 0000000..fafe8ee
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.emitter.influxdb.InfluxdbEmitterModule
diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
new file mode 100644
index 0000000..aa0d061
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterConfigTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class InfluxdbEmitterConfigTest
+{
+  private ObjectMapper mapper = new DefaultObjectMapper();
+  private InfluxdbEmitterConfig influxdbEmitterConfig;
+
+  @Before
+  public void setUp()
+  {
+    mapper.setInjectableValues(new InjectableValues.Std().addValue(
+        ObjectMapper.class,
+        new DefaultObjectMapper()
+    ));
+
+    influxdbEmitterConfig = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+  }
+
+  @Test
+  public void testInfluxdbEmitterConfigObjectsAreDifferent() throws IOException
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
+        "localhost",
+        8080,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    Assert.assertNotEquals(influxdbEmitterConfig, influxdbEmitterConfigComparison);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigWithNullHostname() throws IOException
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
+        null,
+        8080,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+  }
+
+  @Test
+  public void testConfigWithNullPort() throws IOException
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigWithNullPort = new InfluxdbEmitterConfig(
+        "localhost",
+        null,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    int expectedPort = 8086;
+    Assert.assertEquals(expectedPort, influxdbEmitterConfig.getPort());
+  }
+
+  @Test
+  public void testEqualsMethod()
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    Assert.assertTrue(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison));
+  }
+
+  @Test
+  public void testEqualsMethodWithNotEqualConfigs()
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigComparison = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        10000,
+        "adam",
+        "password",
+        null
+    );
+    Assert.assertFalse(influxdbEmitterConfig.equals(influxdbEmitterConfigComparison));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigWithNullInfluxdbUserName() throws IOException
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        null,
+        "password",
+        null
+    );
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigWithNullInfluxdbPassword() throws IOException
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfigWithNullHostname = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        null,
+        null
+    );
+  }
+
+  @Test
+  public void testConfigWithNullDimensionWhitelist()
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "type", "numMetrics", "numDimensions", "threshold", "dimension", "taskType", "taskStatus", "tier"));
+    Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
+  }
+
+  @Test
+  public void testConfigWithDimensionWhitelist()
+  {
+    InfluxdbEmitterConfig influxdbEmitterConfig = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        ImmutableSet.of("dataSource", "taskType")
+    );
+    ImmutableSet<String> expected = ImmutableSet.copyOf(Arrays.asList("dataSource", "taskType"));
+    Assert.assertEquals(expected, influxdbEmitterConfig.getDimensionWhitelist());
+  }
+
+}
diff --git a/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
new file mode 100644
index 0000000..2095a2f
--- /dev/null
+++ b/extensions-contrib/influxdb-emitter/src/test/java/org/apache/druid/emitter/influxdb/InfluxdbEmitterTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.influxdb;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InfluxdbEmitterTest
+{
+
+  private ServiceMetricEvent event;
+
+  @Before
+  public void setUp()
+  {
+    DateTime date = new DateTime(2017,
+                                 10,
+                                 30,
+                                 10,
+                                 00,
+                                 DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+    String metric = "metric/te/st/value";
+    Number value = 1234;
+    ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+        "service",
+        "druid/historical",
+        "host",
+        "localhost",
+        "version",
+        "0.10.0"
+    );
+    ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    builder.setDimension("nonWhiteListedDim", "test");
+    builder.setDimension("dataSource", "test_datasource");
+    ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+    event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+  }
+
+  @Test
+  public void testTransformForInfluxWithLongMetric()
+  {
+    InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+    String expected =
+        "druid_metric,service=druid/historical,metric=druid_te_st,hostname=localhost,dataSource=test_datasource druid_value=1234 1509357600000000000"
+        + "\n";
+    String actual = influxdbEmitter.transformForInfluxSystems(event);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testTransformForInfluxWithShortMetric()
+  {
+    DateTime date = new DateTime(2017,
+                                 10,
+                                 30,
+                                 10,
+                                 00,
+                                 DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+    String metric = "metric/time";
+    Number value = 1234;
+    ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+        "service",
+        "druid/historical",
+        "host",
+        "localhost",
+        "version",
+        "0.10.0"
+    );
+    ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+    ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+    InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+    String expected = "druid_metric,service=druid/historical,hostname=localhost druid_time=1234 1509357600000000000"
+                      + "\n";
+    String actual = influxdbEmitter.transformForInfluxSystems(event);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testMetricIsInDimensionWhitelist()
+  {
+    DateTime date = new DateTime(2017,
+                                 10,
+                                 30,
+                                 10,
+                                 00,
+                                 DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+    String metric = "metric/time";
+    Number value = 1234;
+    ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+        "service",
+        "druid/historical",
+        "host",
+        "localhost",
+        "version",
+        "0.10.0"
+    );
+    ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+    builder.setDimension("dataSource", "wikipedia");
+    builder.setDimension("taskType", "index");
+    ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+    InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        ImmutableSet.of("dataSource")
+    );
+    InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+    String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia druid_time=1234 1509357600000000000"
+                      + "\n";
+    String actual = influxdbEmitter.transformForInfluxSystems(event);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testMetricIsInDefaultDimensionWhitelist()
+  {
+    DateTime date = new DateTime(2017,
+                                 10,
+                                 30,
+                                 10,
+                                 00,
+                                 DateTimeZone.UTC); // 10:00am on 30/10/2017 = 1509357600000000000 in epoch nanoseconds
+    String metric = "metric/time";
+    Number value = 1234;
+    ImmutableMap<String, String> serviceDims = ImmutableMap.of(
+        "service",
+        "druid/historical",
+        "host",
+        "localhost",
+        "version",
+        "0.10.0"
+    );
+    ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    ServiceEventBuilder eventBuilder = builder.build(date, metric, value);
+    builder.setDimension("dataSource", "wikipedia");
+    builder.setDimension("taskType", "index");
+    ServiceMetricEvent event = (ServiceMetricEvent) eventBuilder.build(serviceDims);
+    InfluxdbEmitterConfig config = new InfluxdbEmitterConfig(
+        "localhost",
+        8086,
+        "dbname",
+        10000,
+        15000,
+        30000,
+        "adam",
+        "password",
+        null
+    );
+    InfluxdbEmitter influxdbEmitter = new InfluxdbEmitter(config);
+    String expected = "druid_metric,service=druid/historical,hostname=localhost,dataSource=wikipedia,taskType=index druid_time=1234 1509357600000000000"
+                      + "\n";
+    String actual = influxdbEmitter.transformForInfluxSystems(event);
+    Assert.assertEquals(expected, actual);
+  }
+}
diff --git a/pom.xml b/pom.xml
index df24ca8..0c5c5ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,6 +175,7 @@
         <module>extensions-contrib/momentsketch</module>
         <module>extensions-contrib/moving-average-query</module>
         <module>extensions-contrib/tdigestsketch</module>
+        <module>extensions-contrib/influxdb-emitter</module>
         <!-- distribution packaging -->
         <module>distribution</module>
     </modules>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org