You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/09/20 13:13:29 UTC
[1/2] bahir-flink git commit: [BAHIR-134] Add InfluxDb sink for flink
stream
Repository: bahir-flink
Updated Branches:
refs/heads/master b580566f0 -> 4f0179a17
[BAHIR-134] Add InfluxDb sink for flink stream
This closes #21
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/f07276ee
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/f07276ee
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/f07276ee
Branch: refs/heads/master
Commit: f07276eef2babc52ffdb43c5fcb76f9d51b9153f
Parents: b580566
Author: zhouhai02 <zh...@meituan.com>
Authored: Sun Aug 27 19:35:31 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Sep 20 15:05:28 2017 +0200
----------------------------------------------------------------------
flink-connector-influxdb/README.md | 32 +++
.../examples/influxdb/InfluxDBSinkExample.java | 94 +++++++
.../src/main/resources/log4j.properties | 23 ++
flink-connector-influxdb/pom.xml | 78 ++++++
.../connectors/influxdb/InfluxDBConfig.java | 257 +++++++++++++++++++
.../connectors/influxdb/InfluxDBPoint.java | 78 ++++++
.../connectors/influxdb/InfluxDBSink.java | 106 ++++++++
pom.xml | 1 +
8 files changed, 669 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/README.md b/flink-connector-influxdb/README.md
new file mode 100644
index 0000000..0f9e477
--- /dev/null
+++ b/flink-connector-influxdb/README.md
@@ -0,0 +1,32 @@
+# Flink InfluxDB Connector
+
+This connector provides a sink that can send data to [InfluxDB](https://www.influxdata.com/). To use this connector, add the
+following dependency to your project:
+
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>flink-connector-influxdb_2.11</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+
+*Version Compatibility*: This module is compatible with InfluxDB 1.3.x
+*Requirements*: Java 1.8+
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html).
+
+## Installing InfluxDB
+Follow the instructions from the [InfluxDB download page](https://portal.influxdata.com/downloads#influxdb).
+
+## Examples
+
+### JAVA API
+
+ DataStream<InfluxDBPoint> dataStream = ...
+ InfluxDBConfig influxDBConfig = InfluxDBConfig.builder(String host, String username, String password, String dbName)
+ dataStream.addSink(new InfluxDBSink(influxDBConfig));
+
+
+See end-to-end examples at [InfluxDB Examples](https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb/examples)
+
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java b/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
new file mode 100644
index 0000000..3047743
--- /dev/null
+++ b/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.influxdb;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
+import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
+import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is an example showing the to use the InfluxDB Sink in the Streaming API.
+ * <p>
+ * <p>The example assumes that a database exists in a local InfluxDB server, according to the following query:
+ * <p>curl -POST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE db_flink_test"
+ */
+public class InfluxDBSinkExample {
+ private static final Logger LOG = LoggerFactory.getLogger(InfluxDBSinkExample.class);
+
+ private static final int N = 10000;
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ List<String> dataList = new ArrayList<>();
+ for (int i = 0; i < N; ++i) {
+ String id = "server" + String.valueOf(i);
+ dataList.add("cpu#" + id);
+ dataList.add("mem#" + id);
+ dataList.add("disk#" + id);
+ }
+ DataStream<String> source = env.fromElements(dataList.toArray(new String[0]));
+
+
+ DataStream<InfluxDBPoint> dataStream = source.map(
+ new RichMapFunction<String, InfluxDBPoint>() {
+ @Override
+ public InfluxDBPoint map(String s) throws Exception {
+ String[] input = s.split("#");
+
+ String measurement = input[0];
+ long timestamp = System.currentTimeMillis();
+
+ HashMap<String, String> tags = new HashMap<>();
+ tags.put("host", input[1]);
+ tags.put("region", "region#" + String.valueOf(input[1].hashCode() % 20));
+
+ HashMap<String, Object> fields = new HashMap<>();
+ fields.put("value1", input[1].hashCode() % 100);
+ fields.put("value2", input[1].hashCode() % 50);
+
+ return new InfluxDBPoint(measurement, timestamp, tags, fields);
+ }
+ }
+ );
+
+ //dataStream.print();
+
+ //InfluxDBConfig influxDBConfig = new InfluxDBConfig.Builder("http://localhost:8086", "root", "root", "db_flink_test")
+ InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
+ .batchActions(1000)
+ .flushDuration(100, TimeUnit.MILLISECONDS)
+ .enableGzip(true)
+ .build();
+
+ dataStream.addSink(new InfluxDBSink(influxDBConfig));
+
+ env.execute("InfluxDB Sink Example");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/examples/src/main/resources/log4j.properties b/flink-connector-influxdb/examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6a59e09
--- /dev/null
+++ b/flink-connector-influxdb/examples/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/pom.xml b/flink-connector-influxdb/pom.xml
new file mode 100644
index 0000000..dce012e
--- /dev/null
+++ b/flink-connector-influxdb/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-flink-parent_2.11</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-influxdb_2.11</artifactId>
+ <name>flink-connector-influxdb</name>
+
+ <packaging>jar</packaging>
+
+ <properties>
+ <influxdb-client.version>2.7</influxdb-client.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.influxdb</groupId>
+ <artifactId>influxdb-java</artifactId>
+ <version>${influxdb-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
new file mode 100644
index 0000000..9c1220d
--- /dev/null
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration for InfluxDB.
+ */
+public class InfluxDBConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_BATCH_ACTIONS = 2000;
+ private static final int DEFAULT_FLUSH_DURATION = 100;
+
+ private String url;
+ private String username;
+ private String password;
+ private String database;
+ private int batchActions;
+ private int flushDuration;
+ private TimeUnit flushDurationTimeUnit;
+ private boolean enableGzip;
+
+ public InfluxDBConfig(InfluxDBConfig.Builder builder) {
+ Preconditions.checkArgument(builder != null, "InfluxDBConfig builder can not be null");
+
+ this.url = Preconditions.checkNotNull(builder.getUrl(), "host can not be null");
+ this.username = Preconditions.checkNotNull(builder.getUsername(), "username can not be null");
+ this.password = Preconditions.checkNotNull(builder.getPassword(), "password can not be null");
+ this.database = Preconditions.checkNotNull(builder.getDatabase(), "database name can not be null");
+
+ this.batchActions = builder.getBatchActions();
+ this.flushDuration = builder.getFlushDuration();
+ this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit();
+ this.enableGzip = builder.isEnableGzip();
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public int getBatchActions() {
+ return batchActions;
+ }
+
+ public int getFlushDuration() {
+ return flushDuration;
+ }
+
+ public TimeUnit getFlushDurationTimeUnit() {
+ return flushDurationTimeUnit;
+ }
+
+ public boolean isEnableGzip() {
+ return enableGzip;
+ }
+
+ /**
+ * Creates a new {@link InfluxDBConfig.Builder} instance.
+ * <p/>
+ * This is a convenience method for {@code new InfluxDBConfig.Builder()}.
+ *
+ * @param url the url to connect to
+ * @param username the username which is used to authorize against the influxDB instance
+ * @param password the password for the username which is used to authorize against the influxDB
+ * instance
+ * @param database the name of the database to write
+ * @return the new InfluxDBConfig builder.
+ */
+ public static Builder builder(String url, String username, String password, String database) {
+ return new Builder(url, username, password, database);
+ }
+
+ /**
+ * A builder used to create a build an instance of a InfluxDBConfig.
+ */
+ public static class Builder {
+ private String url;
+ private String username;
+ private String password;
+ private String database;
+ private int batchActions = DEFAULT_BATCH_ACTIONS;
+ private int flushDuration = DEFAULT_FLUSH_DURATION;
+ private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS;
+ private boolean enableGzip = false;
+
+ /**
+ * Creates a builder
+ *
+ * @param url the url to connect to
+ * @param username the username which is used to authorize against the influxDB instance
+ * @param password the password for the username which is used to authorize against the influxDB
+ * instance
+ * @param database the name of the database to write
+ */
+ public Builder(String url, String username, String password, String database) {
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ this.database = database;
+ }
+
+ /**
+ * Sets url.
+ *
+ * @param url the url to connect to
+ * @return this Builder to use it fluent
+ */
+ public InfluxDBConfig.Builder url(String url) {
+ this.url = url;
+ return this;
+ }
+
+ /**
+ * Sets username.
+ *
+ * @param username the username which is used to authorize against the influxDB instance
+ * @return this Builder to use it fluent
+ */
+ public InfluxDBConfig.Builder username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ /**
+ * Sets password.
+ *
+ * @param password the password for the username which is used to authorize against the influxDB
+ * instance
+ * @return this Builder to use it fluent
+ */
+ public InfluxDBConfig.Builder password(String password) {
+ this.password = password;
+ return this;
+ }
+
+ /**
+ * Sets database name.
+ *
+ * @param database the name of the database to write
+ * @return this Builder to use it fluent
+ */
+ public InfluxDBConfig.Builder database(String database) {
+ this.database = database;
+ return this;
+ }
+
+ /**
+ * Sets when to flush a new bulk request based on the number of batch actions currently added.
+ * Defaults to <tt>DEFAULT_BATCH_ACTIONS</tt>. Can be set to <tt>-1</tt> to disable it.
+ *
+ * @param batchActions number of Points written after which a write must happen.
+ * @return this Builder to use it fluent
+ */
+ public InfluxDBConfig.Builder batchActions(int batchActions) {
+ this.batchActions = batchActions;
+ return this;
+ }
+
+ /**
+ * Sets a flush interval flushing *any* bulk actions pending if the interval passes.
+ *
+ * @param flushDuration the flush duration
+ * @param flushDurationTimeUnit the TimeUnit of the flush duration
+ * @return this Builder to use it fluent
+ */
+ public Builder flushDuration(int flushDuration, TimeUnit flushDurationTimeUnit) {
+ this.flushDuration = flushDuration;
+ this.flushDurationTimeUnit = flushDurationTimeUnit;
+ return this;
+ }
+
+ /**
+ * Enable Gzip compress for http request body.
+ *
+ * @param enableGzip the enableGzip value
+ * @return this Builder to use it fluent
+ */
+ public InfluxDBConfig.Builder enableGzip(boolean enableGzip) {
+ this.enableGzip = enableGzip;
+ return this;
+ }
+
+ /**
+ * Builds InfluxDBConfig.
+ *
+ * @return the InfluxDBConfig instance.
+ */
+ public InfluxDBConfig build() {
+ return new InfluxDBConfig(this);
+ }
+
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public int getBatchActions() {
+ return batchActions;
+ }
+
+ public int getFlushDuration() {
+ return flushDuration;
+ }
+
+ public TimeUnit getFlushDurationTimeUnit() {
+ return flushDurationTimeUnit;
+ }
+
+ public boolean isEnableGzip() {
+ return enableGzip;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java
new file mode 100644
index 0000000..3be9c90
--- /dev/null
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Representation of a InfluxDB database Point.
+ */
+public class InfluxDBPoint {
+
+ private String measurement;
+ private long timestamp;
+ private Map<String, String> tags;
+ private Map<String, Object> fields;
+
+ public InfluxDBPoint(String measurement, long timestamp) {
+ this.measurement = measurement;
+ this.timestamp = timestamp;
+ this.fields = new HashMap<>();
+ this.tags = new HashMap<>();
+ }
+
+ public InfluxDBPoint(String measurement, long timestamp, Map<String, String> tags, Map<String, Object> fields) {
+ this.measurement = measurement;
+ this.timestamp = timestamp;
+ this.tags = tags;
+ this.fields = fields;
+ }
+
+ public String getMeasurement() {
+ return measurement;
+ }
+
+ public void setMeasurement(String measurement) {
+ this.measurement = measurement;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public Map<String, String> getTags() {
+ return tags;
+ }
+
+ public void setTags(Map<String, String> tags) {
+ this.tags = tags;
+ }
+
+ public Map<String, Object> getFields() {
+ return fields;
+ }
+
+ public void setFields(Map<String, Object> fields) {
+ this.fields = fields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
new file mode 100644
index 0000000..03521b9
--- /dev/null
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sink to save data into a InfluxDB cluster.
+ */
+public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> {
+
+ private transient InfluxDB influxDBClient;
+ private final InfluxDBConfig influxDBConfig;
+
+ /**
+ * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server.
+ *
+ * @param influxDBConfig The configuration of {@link InfluxDBConfig}
+ */
+ public InfluxDBSink(InfluxDBConfig influxDBConfig) {
+ this.influxDBConfig = Preconditions.checkNotNull(influxDBConfig, "InfluxDB client config should not be null");
+ }
+
+ /**
+ * Initializes the connection to InfluxDB by either cluster or sentinels or single server.
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword());
+
+ if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) {
+ throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!");
+ }
+
+ influxDBClient.setDatabase(influxDBConfig.getDatabase());
+
+ if (influxDBConfig.getBatchActions() > 0) {
+ influxDBClient.enableBatch(influxDBConfig.getBatchActions(), influxDBConfig.getFlushDuration(), influxDBConfig.getFlushDurationTimeUnit());
+ }
+
+ if (influxDBConfig.isEnableGzip()) {
+
+ influxDBClient.enableGzip();
+ }
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to InfluxDB.
+ *
+ * @param dataPoint {@link InfluxDBPoint}
+ */
+ @Override
+ public void invoke(InfluxDBPoint dataPoint) throws Exception {
+ if (StringUtils.isNullOrWhitespaceOnly(dataPoint.getMeasurement())) {
+ throw new RuntimeException("No measurement defined");
+ }
+
+ Point.Builder builder = Point.measurement(dataPoint.getMeasurement())
+ .time(dataPoint.getTimestamp(), TimeUnit.MILLISECONDS);
+
+ if (!CollectionUtil.isNullOrEmpty(dataPoint.getFields())) {
+ builder.fields(dataPoint.getFields());
+ }
+
+ if (!CollectionUtil.isNullOrEmpty(dataPoint.getTags())) {
+ builder.tag(dataPoint.getTags());
+ }
+
+ Point point = builder.build();
+ influxDBClient.write(point);
+ }
+
+ @Override
+ public void close() {
+ if (influxDBClient.isBatchEnabled()) {
+ influxDBClient.disableBatch();
+ }
+ influxDBClient.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2e39a83..4070698 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
<module>flink-connector-activemq</module>
<module>flink-connector-netty</module>
<module>flink-connector-akka</module>
+ <module>flink-connector-influxdb</module>
</modules>
<properties>
[2/2] bahir-flink git commit: As per mailing list discussion,
remove Java 7
Posted by rm...@apache.org.
As per mailing list discussion, remove Java 7
Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/4f0179a1
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/4f0179a1
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/4f0179a1
Branch: refs/heads/master
Commit: 4f0179a173cd2cc8f6b441f6af2602d3b661aeb2
Parents: f07276e
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Sep 20 15:06:10 2017 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Sep 20 15:06:10 2017 +0200
----------------------------------------------------------------------
.travis.yml | 8 --------
1 file changed, 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/4f0179a1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 894ee36..c3baf70 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -46,14 +46,6 @@ matrix:
env:
- FLINK_VERSION="1.3.0" SCALA_VERSION="2.10"
- CACHE_NAME=JDK8_F130_D
- - jdk: openjdk7
- env:
- - FLINK_VERSION="1.3.0" SCALA_VERSION="2.11"
- - CACHE_NAME=JDK7_F130_A
- - jdk: openjdk7
- env:
- - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10"
- - CACHE_NAME=JDK7_F130_B
before_install:
- ./dev/change-scala-version.sh $SCALA_VERSION