You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by rm...@apache.org on 2017/09/20 13:13:29 UTC

[1/2] bahir-flink git commit: [BAHIR-134] Add InfluxDb sink for flink stream

Repository: bahir-flink
Updated Branches:
  refs/heads/master b580566f0 -> 4f0179a17


[BAHIR-134] Add InfluxDb sink for flink stream

This closes #21


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/f07276ee
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/f07276ee
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/f07276ee

Branch: refs/heads/master
Commit: f07276eef2babc52ffdb43c5fcb76f9d51b9153f
Parents: b580566
Author: zhouhai02 <zh...@meituan.com>
Authored: Sun Aug 27 19:35:31 2017 +0800
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Sep 20 15:05:28 2017 +0200

----------------------------------------------------------------------
 flink-connector-influxdb/README.md              |  32 +++
 .../examples/influxdb/InfluxDBSinkExample.java  |  94 +++++++
 .../src/main/resources/log4j.properties         |  23 ++
 flink-connector-influxdb/pom.xml                |  78 ++++++
 .../connectors/influxdb/InfluxDBConfig.java     | 257 +++++++++++++++++++
 .../connectors/influxdb/InfluxDBPoint.java      |  78 ++++++
 .../connectors/influxdb/InfluxDBSink.java       | 106 ++++++++
 pom.xml                                         |   1 +
 8 files changed, 669 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/README.md b/flink-connector-influxdb/README.md
new file mode 100644
index 0000000..0f9e477
--- /dev/null
+++ b/flink-connector-influxdb/README.md
@@ -0,0 +1,32 @@
+# Flink InfluxDB Connector
+
+This connector provides a sink that can send data to [InfluxDB](https://www.influxdata.com/). To use this connector, add the
+following dependency to your project:
+
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>flink-connector-influxdb_2.11</artifactId>
+      <version>1.0-SNAPSHOT</version>
+    </dependency>
+
+*Version Compatibility*: This module is compatible with InfluxDB 1.3.x   
+*Requirements*: Java 1.8+
+
+Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
+See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html).
+ 
+## Installing InfluxDB
+Follow the instructions from the [InfluxDB download page](https://portal.influxdata.com/downloads#influxdb).
+  
+## Examples
+
+### JAVA API
+
+    DataStream<InfluxDBPoint> dataStream = ...
+    InfluxDBConfig influxDBConfig = InfluxDBConfig.builder(String host, String username, String password, String dbName)
+    dataStream.addSink(new InfluxDBSink(influxDBConfig));
+
+
+See end-to-end examples at [InfluxDB Examples](https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb/examples)
+
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java b/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
new file mode 100644
index 0000000..3047743
--- /dev/null
+++ b/flink-connector-influxdb/examples/src/main/java/org/apache/flink/streaming/examples/influxdb/InfluxDBSinkExample.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.influxdb;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.influxdb.InfluxDBConfig;
+import org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint;
+import org.apache.flink.streaming.connectors.influxdb.InfluxDBSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is an example showing the to use the InfluxDB Sink in the Streaming API.
+ * <p>
+ * <p>The example assumes that a database exists in a local InfluxDB server, according to the following query:
+ * <p>curl -POST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE db_flink_test"
+ */
+public class InfluxDBSinkExample {
+    private static final Logger LOG = LoggerFactory.getLogger(InfluxDBSinkExample.class);
+
+    private static final int N = 10000;
+
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        List<String> dataList = new ArrayList<>();
+        for (int i = 0; i < N; ++i) {
+            String id = "server" + String.valueOf(i);
+            dataList.add("cpu#" + id);
+            dataList.add("mem#" + id);
+            dataList.add("disk#" + id);
+        }
+        DataStream<String> source = env.fromElements(dataList.toArray(new String[0]));
+
+
+        DataStream<InfluxDBPoint> dataStream = source.map(
+                new RichMapFunction<String, InfluxDBPoint>() {
+                    @Override
+                    public InfluxDBPoint map(String s) throws Exception {
+                        String[] input = s.split("#");
+
+                        String measurement = input[0];
+                        long timestamp = System.currentTimeMillis();
+
+                        HashMap<String, String> tags = new HashMap<>();
+                        tags.put("host", input[1]);
+                        tags.put("region", "region#" + String.valueOf(input[1].hashCode() % 20));
+
+                        HashMap<String, Object> fields = new HashMap<>();
+                        fields.put("value1", input[1].hashCode() % 100);
+                        fields.put("value2", input[1].hashCode() % 50);
+
+                        return new InfluxDBPoint(measurement, timestamp, tags, fields);
+                    }
+                }
+        );
+
+        //dataStream.print();
+
+        //InfluxDBConfig influxDBConfig = new InfluxDBConfig.Builder("http://localhost:8086", "root", "root", "db_flink_test")
+        InfluxDBConfig influxDBConfig = InfluxDBConfig.builder("http://localhost:8086", "root", "root", "db_flink_test")
+                .batchActions(1000)
+                .flushDuration(100, TimeUnit.MILLISECONDS)
+                .enableGzip(true)
+                .build();
+
+        dataStream.addSink(new InfluxDBSink(influxDBConfig));
+
+        env.execute("InfluxDB Sink Example");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/examples/src/main/resources/log4j.properties b/flink-connector-influxdb/examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6a59e09
--- /dev/null
+++ b/flink-connector-influxdb/examples/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/pom.xml b/flink-connector-influxdb/pom.xml
new file mode 100644
index 0000000..dce012e
--- /dev/null
+++ b/flink-connector-influxdb/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-influxdb_2.11</artifactId>
+    <name>flink-connector-influxdb</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <influxdb-client.version>2.7</influxdb-client.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.influxdb</groupId>
+            <artifactId>influxdb-java</artifactId>
+            <version>${influxdb-client.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
new file mode 100644
index 0000000..9c1220d
--- /dev/null
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration for InfluxDB.
+ */
+public class InfluxDBConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final int DEFAULT_BATCH_ACTIONS = 2000;
+    private static final int DEFAULT_FLUSH_DURATION = 100;
+
+    private String url;
+    private String username;
+    private String password;
+    private String database;
+    private int batchActions;
+    private int flushDuration;
+    private TimeUnit flushDurationTimeUnit;
+    private boolean enableGzip;
+
+    public InfluxDBConfig(InfluxDBConfig.Builder builder) {
+        Preconditions.checkArgument(builder != null, "InfluxDBConfig builder can not be null");
+
+        this.url = Preconditions.checkNotNull(builder.getUrl(), "host can not be null");
+        this.username = Preconditions.checkNotNull(builder.getUsername(), "username can not be null");
+        this.password = Preconditions.checkNotNull(builder.getPassword(), "password can not be null");
+        this.database = Preconditions.checkNotNull(builder.getDatabase(), "database name can not be null");
+
+        this.batchActions = builder.getBatchActions();
+        this.flushDuration = builder.getFlushDuration();
+        this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit();
+        this.enableGzip = builder.isEnableGzip();
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public int getBatchActions() {
+        return batchActions;
+    }
+
+    public int getFlushDuration() {
+        return flushDuration;
+    }
+
+    public TimeUnit getFlushDurationTimeUnit() {
+        return flushDurationTimeUnit;
+    }
+
+    public boolean isEnableGzip() {
+        return enableGzip;
+    }
+
+    /**
+     * Creates a new {@link InfluxDBConfig.Builder} instance.
+     * <p/>
+     * This is a convenience method for {@code new InfluxDBConfig.Builder()}.
+     *
+     * @param url      the url to connect to
+     * @param username the username which is used to authorize against the influxDB instance
+     * @param password the password for the username which is used to authorize against the influxDB
+     *                 instance
+     * @param database the name of the database to write
+     * @return the new InfluxDBConfig builder.
+     */
+    public static Builder builder(String url, String username, String password, String database) {
+        return new Builder(url, username, password, database);
+    }
+
+    /**
+     * A builder used to create a build an instance of a InfluxDBConfig.
+     */
+    public static class Builder {
+        private String url;
+        private String username;
+        private String password;
+        private String database;
+        private int batchActions = DEFAULT_BATCH_ACTIONS;
+        private int flushDuration = DEFAULT_FLUSH_DURATION;
+        private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS;
+        private boolean enableGzip = false;
+
+        /**
+         * Creates a builder
+         *
+         * @param url      the url to connect to
+         * @param username the username which is used to authorize against the influxDB instance
+         * @param password the password for the username which is used to authorize against the influxDB
+         *                 instance
+         * @param database the name of the database to write
+         */
+        public Builder(String url, String username, String password, String database) {
+            this.url = url;
+            this.username = username;
+            this.password = password;
+            this.database = database;
+        }
+
+        /**
+         * Sets url.
+         *
+         * @param url the url to connect to
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder url(String url) {
+            this.url = url;
+            return this;
+        }
+
+        /**
+         * Sets username.
+         *
+         * @param username the username which is used to authorize against the influxDB instance
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder username(String username) {
+            this.username = username;
+            return this;
+        }
+
+        /**
+         * Sets password.
+         *
+         * @param password the password for the username which is used to authorize against the influxDB
+         *                 instance
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder password(String password) {
+            this.password = password;
+            return this;
+        }
+
+        /**
+         * Sets database name.
+         *
+         * @param database the name of the database to write
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder database(String database) {
+            this.database = database;
+            return this;
+        }
+
+        /**
+         * Sets when to flush a new bulk request based on the number of batch actions currently added.
+         * Defaults to <tt>DEFAULT_BATCH_ACTIONS</tt>. Can be set to <tt>-1</tt> to disable it.
+         *
+         * @param batchActions number of Points written after which a write must happen.
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder batchActions(int batchActions) {
+            this.batchActions = batchActions;
+            return this;
+        }
+
+        /**
+         * Sets a flush interval flushing *any* bulk actions pending if the interval passes.
+         *
+         * @param flushDuration         the flush duration
+         * @param flushDurationTimeUnit the TimeUnit of the flush duration
+         * @return this Builder to use it fluent
+         */
+        public Builder flushDuration(int flushDuration, TimeUnit flushDurationTimeUnit) {
+            this.flushDuration = flushDuration;
+            this.flushDurationTimeUnit = flushDurationTimeUnit;
+            return this;
+        }
+
+        /**
+         * Enable Gzip compress for http request body.
+         *
+         * @param enableGzip the enableGzip value
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder enableGzip(boolean enableGzip) {
+            this.enableGzip = enableGzip;
+            return this;
+        }
+
+        /**
+         * Builds InfluxDBConfig.
+         *
+         * @return the InfluxDBConfig instance.
+         */
+        public InfluxDBConfig build() {
+            return new InfluxDBConfig(this);
+        }
+
+
+        public String getUrl() {
+            return url;
+        }
+
+        public String getUsername() {
+            return username;
+        }
+
+        public String getPassword() {
+            return password;
+        }
+
+        public String getDatabase() {
+            return database;
+        }
+
+        public int getBatchActions() {
+            return batchActions;
+        }
+
+        public int getFlushDuration() {
+            return flushDuration;
+        }
+
+        public TimeUnit getFlushDurationTimeUnit() {
+            return flushDurationTimeUnit;
+        }
+
+        public boolean isEnableGzip() {
+            return enableGzip;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java
new file mode 100644
index 0000000..3be9c90
--- /dev/null
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBPoint.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Representation of a InfluxDB database Point.
+ */
+public class InfluxDBPoint {
+
+    private String measurement;
+    private long timestamp;
+    private Map<String, String> tags;
+    private Map<String, Object> fields;
+
+    public InfluxDBPoint(String measurement, long timestamp) {
+        this.measurement = measurement;
+        this.timestamp = timestamp;
+        this.fields = new HashMap<>();
+        this.tags = new HashMap<>();
+    }
+
+    public InfluxDBPoint(String measurement, long timestamp, Map<String, String> tags, Map<String, Object> fields) {
+        this.measurement = measurement;
+        this.timestamp = timestamp;
+        this.tags = tags;
+        this.fields = fields;
+    }
+
+    public String getMeasurement() {
+        return measurement;
+    }
+
+    public void setMeasurement(String measurement) {
+        this.measurement = measurement;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public Map<String, String> getTags() {
+        return tags;
+    }
+
+    public void setTags(Map<String, String> tags) {
+        this.tags = tags;
+    }
+
+    public Map<String, Object> getFields() {
+        return fields;
+    }
+
+    public void setFields(Map<String, Object> fields) {
+        this.fields = fields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
new file mode 100644
index 0000000..03521b9
--- /dev/null
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.influxdb;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Sink to save data into a InfluxDB cluster.
+ */
+public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> {
+
+    private transient InfluxDB influxDBClient;
+    private final InfluxDBConfig influxDBConfig;
+
+    /**
+     * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server.
+     *
+     * @param influxDBConfig The configuration of {@link InfluxDBConfig}
+     */
+    public InfluxDBSink(InfluxDBConfig influxDBConfig) {
+        this.influxDBConfig = Preconditions.checkNotNull(influxDBConfig, "InfluxDB client config should not be null");
+    }
+
+    /**
+     * Initializes the connection to InfluxDB by either cluster or sentinels or single server.
+     */
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+
+        influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword());
+
+        if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) {
+            throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!");
+        }
+
+        influxDBClient.setDatabase(influxDBConfig.getDatabase());
+
+        if (influxDBConfig.getBatchActions() > 0) {
+            influxDBClient.enableBatch(influxDBConfig.getBatchActions(), influxDBConfig.getFlushDuration(), influxDBConfig.getFlushDurationTimeUnit());
+        }
+
+        if (influxDBConfig.isEnableGzip()) {
+
+            influxDBClient.enableGzip();
+        }
+    }
+
+    /**
+     * Called when new data arrives to the sink, and forwards it to InfluxDB.
+     *
+     * @param dataPoint {@link InfluxDBPoint}
+     */
+    @Override
+    public void invoke(InfluxDBPoint dataPoint) throws Exception {
+        if (StringUtils.isNullOrWhitespaceOnly(dataPoint.getMeasurement())) {
+            throw new RuntimeException("No measurement defined");
+        }
+
+        Point.Builder builder = Point.measurement(dataPoint.getMeasurement())
+                .time(dataPoint.getTimestamp(), TimeUnit.MILLISECONDS);
+
+        if (!CollectionUtil.isNullOrEmpty(dataPoint.getFields())) {
+            builder.fields(dataPoint.getFields());
+        }
+
+        if (!CollectionUtil.isNullOrEmpty(dataPoint.getTags())) {
+            builder.tag(dataPoint.getTags());
+        }
+
+        Point point = builder.build();
+        influxDBClient.write(point);
+    }
+
+    @Override
+    public void close() {
+        if (influxDBClient.isBatchEnabled()) {
+            influxDBClient.disableBatch();
+        }
+        influxDBClient.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f07276ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2e39a83..4070698 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
     <module>flink-connector-activemq</module>
     <module>flink-connector-netty</module>
     <module>flink-connector-akka</module>
+    <module>flink-connector-influxdb</module>
   </modules>
 
   <properties>


[2/2] bahir-flink git commit: As per mailing list discussion, remove Java 7

Posted by rm...@apache.org.
As per mailing list discussion, remove Java 7


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/4f0179a1
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/4f0179a1
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/4f0179a1

Branch: refs/heads/master
Commit: 4f0179a173cd2cc8f6b441f6af2602d3b661aeb2
Parents: f07276e
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Sep 20 15:06:10 2017 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Sep 20 15:06:10 2017 +0200

----------------------------------------------------------------------
 .travis.yml | 8 --------
 1 file changed, 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/4f0179a1/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 894ee36..c3baf70 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -46,14 +46,6 @@ matrix:
       env:
         - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10"
         - CACHE_NAME=JDK8_F130_D
-    - jdk: openjdk7
-      env:
-        - FLINK_VERSION="1.3.0" SCALA_VERSION="2.11"
-        - CACHE_NAME=JDK7_F130_A
-    - jdk: openjdk7
-      env:
-        - FLINK_VERSION="1.3.0" SCALA_VERSION="2.10"
-        - CACHE_NAME=JDK7_F130_B
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION