You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/11/07 20:28:54 UTC

[1/3] bahir-flink git commit: [BAHIR-114] update flume to 1.8 and add some tests

Repository: bahir-flink
Updated Branches:
  refs/heads/master 8b1011803 -> 898e913fe


[BAHIR-114] update flume to 1.8 and add some tests

Closes #33


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/72cbe807
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/72cbe807
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/72cbe807

Branch: refs/heads/master
Commit: 72cbe807294cad84a1ef2919e4205c7c37ec861e
Parents: 8b10118
Author: Joao Boto <bo...@boto.pro>
Authored: Wed Aug 22 12:10:32 2018 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 11:56:39 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |  59 +++++-----
 flink-connector-flume/README.md                 |   2 +-
 flink-connector-flume/dockers/conf/sink.conf    |  34 ++++++
 flink-connector-flume/dockers/conf/source.conf  |  33 ++++++
 .../dockers/docker-compose.yml                  |  57 +++++++++
 flink-connector-flume/pom.xml                   |  95 +++------------
 .../connectors/flume/FlumeRpcClient.java        | 118 +++++++++++++++++++
 .../streaming/connectors/flume/FlumeSink.java   | 100 ++--------------
 .../connectors/flume/FlumeRpcClientTest.java    |  68 +++++++++++
 .../connectors/flume/FlumeSinkTest.java         |  39 ++++++
 .../src/test/resources/log4j.properties         |  27 +++++
 pom.xml                                         |   1 -
 12 files changed, 434 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 691667c..3049224 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -18,49 +18,48 @@
 sudo: required
 dist: trusty
 
-cache:
-  directories:
-  - $HOME/.m2
+language: java
 
 # do not cache our own artifacts
 before_cache:
   - rm -rf $HOME/.m2/repository/org/apache/flink/
 
-language: java
+cache:
+  directories:
+  - $HOME/.m2
 
 services:
   - docker
 
-matrix:
-  include:
-    - jdk: oraclejdk8
-      env:
-        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
-        - MAVEN_PROFILE="default"
-        - CACHE_NAME=JDK8_F130_A
-    - jdk: openjdk8
-      env:
-        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
-        - CACHE_NAME=JDK8_F130_C
-        - MAVEN_PROFILE="default"
-        - CACHE_NAME=JDK8_F130_B
-    - jdk: openjdk8
-      env:
-        - FLINK_VERSION="1.5.1" SCALA_VERSION="2.11"
-        - MAVEN_PROFILE="test-kudu"
-        - CACHE_NAME=JDK8_F130_KUDU
+jdk:
+  - oraclejdk8
+  - openjdk8
+
+env:
+  - |
+    FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="false"
+    PROJECTS="flink-connector-activemq,flink-connector-akka,flink-connector-influxdb,flink-connector-netty,flink-connector-redis,flink-library-siddhi"
+  - |
+    FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true"
+    PROJECTS="flink-connector-flume"
+  - |
+    FLINK_VERSION="1.5.1" SCALA_VERSION="2.11" DOCKER="true"
+    PROJECTS="flink-connector-kudu"
+
 
 before_install:
   - ./dev/change-scala-version.sh $SCALA_VERSION
 
 install: true
 
-script:
-  - |
-    if [[ $MAVEN_PROFILE == "default" ]]; then
-      mvn clean verify -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
-    fi
-  - |
-    if [[ $MAVEN_PROFILE == "test-kudu" ]]; then
-      flink-connector-kudu/dockers/run_kudu_tests.sh
+before_script:
+  - if [[ $DOCKER == "true" ]]; then
+    docker-compose -f "$PROJECTS/dockers/docker-compose.yml" up -d;
     fi
+
+script: mvn clean verify -pl $PROJECTS -Pscala-$SCALA_VERSION -Dflink.version=$FLINK_VERSION
+
+after_script:
+  - if [[ $DOCKER == "true" ]]; then
+    docker-compose -f "$PROJECTS/dockers/docker-compose.yml" down;
+    fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/README.md
----------------------------------------------------------------------
diff --git a/flink-connector-flume/README.md b/flink-connector-flume/README.md
index c7d7f67..ebcd3f1 100644
--- a/flink-connector-flume/README.md
+++ b/flink-connector-flume/README.md
@@ -9,7 +9,7 @@ following dependency to your project:
       <version>1.1-SNAPSHOT</version>
     </dependency>
 
-*Version Compatibility*: This module is compatible with Flume 1.5.0.
+*Version Compatibility*: This module is compatible with Flume 1.8.0.
 
 Note that the streaming connectors are not part of the binary distribution of Flink. You need to link them into your job jar for cluster execution.
 See how to link with them for cluster execution [here](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/linking.html).

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/dockers/conf/sink.conf
----------------------------------------------------------------------
diff --git a/flink-connector-flume/dockers/conf/sink.conf b/flink-connector-flume/dockers/conf/sink.conf
new file mode 100644
index 0000000..81c246f
--- /dev/null
+++ b/flink-connector-flume/dockers/conf/sink.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+docker.sinks = fileSink
+docker.sources = avroSource
+docker.channels = inMemoryChannel
+
+docker.sources.avroSource.type = avro
+docker.sources.avroSource.channels = c1
+docker.sources.avroSource.bind = 0.0.0.0
+docker.sources.avroSource.port = 4545
+docker.sources.avroSource.channels = inMemoryChannel
+
+docker.channels.inMemoryChannel.type = memory
+docker.channels.inMemoryChannel.capacity = 1000
+docker.channels.inMemoryChannel.transactionCapacity = 100
+
+docker.sinks.fileSink.type = file_roll
+docker.sinks.fileSink.channel = inMemoryChannel
+docker.sinks.fileSink.sink.directory = /var/tmp/output
+docker.sinks.fileSink.rollInterval = 0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/dockers/conf/source.conf
----------------------------------------------------------------------
diff --git a/flink-connector-flume/dockers/conf/source.conf b/flink-connector-flume/dockers/conf/source.conf
new file mode 100644
index 0000000..f883f41
--- /dev/null
+++ b/flink-connector-flume/dockers/conf/source.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+docker.sinks = avroSink
+docker.sources = netcatSource
+docker.channels = inMemoryChannel
+
+docker.sources.netcatSource.type = avro
+docker.sources.netcatSource.bind = 0.0.0.0
+docker.sources.netcatSource.port = 44444
+docker.sources.netcatSource.channels = inMemoryChannel
+
+docker.channels.inMemoryChannel.type = memory
+docker.channels.inMemoryChannel.capacity = 1000
+docker.channels.inMemoryChannel.transactionCapacity = 100
+
+docker.sinks.avroSink.type = avro
+docker.sinks.avroSink.channel = inMemoryChannel
+docker.sinks.avroSink.hostname = sink
+docker.sinks.avroSink.port = 4545

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/dockers/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-connector-flume/dockers/docker-compose.yml b/flink-connector-flume/dockers/docker-compose.yml
new file mode 100644
index 0000000..042bd5e
--- /dev/null
+++ b/flink-connector-flume/dockers/docker-compose.yml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+version: '2'
+
+services:
+
+  source:
+    image: eskabetxe/flume
+    container_name: flume-source
+    hostname: 172.25.0.3
+    ports:
+      - "44444:44444"
+    volumes:
+      - ./conf/source.conf:/opt/flume-config/flume.conf
+    environment:
+      - FLUME_AGENT_NAME=docker
+    links:
+      - "sink:sink"
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.3
+
+  sink:
+    image:  eskabetxe/flume
+    container_name: flume-sink
+    hostname: 172.25.0.4
+    volumes:
+      - ./conf/sink.conf:/opt/flume-config/flume.conf
+      - ./output:/var/tmp/output
+    environment:
+      - FLUME_AGENT_NAME=docker
+    networks:
+      mynet:
+        ipv4_address: 172.25.0.4
+
+networks:
+  mynet:
+    driver: bridge
+    ipam:
+      config:
+      - subnet: 172.25.0.0/24
+        IPRange: 172.25.0.2/24,
+        gateway: 172.25.0.1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index 1f4cf6d..c202c6d 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -35,7 +35,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<flume-ng.version>1.5.0</flume-ng.version>
+		<flume-ng.version>1.8.0</flume-ng.version>
 	</properties>
 
 	<dependencies>
@@ -50,86 +50,23 @@ under the License.
 			<groupId>org.apache.flume</groupId>
 			<artifactId>flume-ng-core</artifactId>
 			<version>${flume-ng.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-io</groupId>
-					<artifactId>commons-io</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-codec</groupId>
-					<artifactId>commons-codec</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-cli</groupId>
-					<artifactId>commons-cli</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-lang</groupId>
-					<artifactId>commons-lang</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.avro</groupId>
-					<artifactId>avro</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-core-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-mapper-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.thoughtworks.paranamer</groupId>
-					<artifactId>paranamer</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.tukaani</groupId>
-					<artifactId>xz</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.velocity</groupId>
-					<artifactId>velocity</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-collections</groupId>
-					<artifactId>commons-collections</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>servlet-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.google.code.gson</groupId>
-					<artifactId>gson</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.thrift</groupId>
-					<artifactId>libthrift</artifactId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
+		<dependency>
+			<groupId>org.junit.jupiter</groupId>
+			<artifactId>junit-jupiter-api</artifactId>
+			<version>5.2.0</version>
+			<scope>test</scope>
+		</dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
----------------------------------------------------------------------
diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
new file mode 100644
index 0000000..e918f56
--- /dev/null
+++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClient.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+
+class FlumeRpcClient implements AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlumeRpcClient.class);
+
+    protected RpcClient client;
+    private String hostname;
+    private int port;
+
+
+    FlumeRpcClient(String hostname, int port) {
+        this.hostname = hostname;
+        this.port = port;
+    }
+
+    /**
+     * Initializes the connection to Apache Flume.
+     */
+    public boolean init() {
+        // Setup the RPC connection
+        int initCounter = 0;
+        while (true) {
+            verifyCounter(initCounter, "Cannot establish connection");
+
+            try {
+                this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+            } catch (FlumeException e) {
+                // Wait one second if the connection failed before the next
+                // try
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    if (LOG.isErrorEnabled()) {
+                        LOG.error("Interrupted while trying to connect {} on {}", hostname, port);
+                    }
+                }
+            }
+            if (client != null) {
+                break;
+            }
+            initCounter++;
+        }
+        return client.isActive();
+    }
+
+
+    public boolean sendData(String data) {
+        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
+        return sendData(event);
+    }
+    public boolean sendData(byte[] data) {
+        Event event = EventBuilder.withBody(data);
+        return sendData(event);
+    }
+
+    private boolean sendData(Event event) {
+        return sendData(event, 0);
+    }
+    private boolean sendData(Event event, int retryCount) {
+        verifyCounter(retryCount, "Cannot send message");
+        try {
+            client.append(event);
+            return true;
+        } catch (EventDeliveryException e) {
+            // clean up and recreate the client
+            reconnect();
+            return sendData(event, ++retryCount);
+        }
+    }
+
+
+    private void verifyCounter(int counter, String messaje) {
+        if (counter >= 10) {
+            throw new RuntimeException(messaje + " on " + hostname + " on " + port);
+        }
+    }
+
+    private void reconnect() {
+        close();
+        client = null;
+        init();
+    }
+
+    @Override
+    public void close() {
+        if (this.client == null) return;
+
+        this.client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 41b1b25..7a80fd2 100644
--- a/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -14,32 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class FlumeSink<IN> extends RichSinkFunction<IN> {
-    private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+    private transient FlumeRpcClient client;
 
-    private transient FlinkRpcClientFacade client;
-    boolean initDone = false;
-    String host;
-    int port;
-    SerializationSchema<IN> schema;
+    private String host;
+    private int port;
+    private SerializationSchema<IN> schema;
 
     public FlumeSink(String host, int port, SerializationSchema<IN> schema) {
         this.host = host;
@@ -57,84 +45,20 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
     @Override
     public void invoke(IN value, Context context) throws Exception {
         byte[] data = schema.serialize(value);
-        client.sendDataToFlume(data);
-
+        client.sendData(data);
     }
 
-    private class FlinkRpcClientFacade {
-        private RpcClient client;
-        private String hostname;
-        private int port;
-
-        /**
-         * Initializes the connection to Apache Flume.
-         *
-         * @param hostname
-         *            The host
-         * @param port
-         *            The port.
-         */
-        public void init(String hostname, int port) {
-            // Setup the RPC connection
-            this.hostname = hostname;
-            this.port = port;
-            int initCounter = 0;
-            while (true) {
-                if (initCounter >= 90) {
-                    throw new RuntimeException("Cannot establish connection with" + port + " at "
-                            + host);
-                }
-                try {
-                    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
-                } catch (FlumeException e) {
-                    // Wait one second if the connection failed before the next
-                    // try
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e1) {
-                        if (LOG.isErrorEnabled()) {
-                            LOG.error("Interrupted while trying to connect {} at {}", port, host);
-                        }
-                    }
-                }
-                if (client != null) {
-                    break;
-                }
-                initCounter++;
-            }
-            initDone = true;
-        }
-
-        /**
-         * Sends byte arrays as {@link Event} series to Apache Flume.
-         *
-         * @param data
-         *            The byte array to send to Apache FLume
-         */
-        public void sendDataToFlume(byte[] data) {
-            Event event = EventBuilder.withBody(data);
-
-            try {
-                client.append(event);
-
-            } catch (EventDeliveryException e) {
-                // clean up and recreate the client
-                client.close();
-                client = null;
-                client = RpcClientFactory.getDefaultInstance(hostname, port);
-            }
-        }
-
+    @Override
+    public void open(Configuration config) {
+        client = new FlumeRpcClient(host, port);
+        client.init();
     }
 
     @Override
     public void close() {
-        client.client.close();
+        if (client == null) return;
+        client.close();
     }
 
-    @Override
-    public void open(Configuration config) {
-        client = new FlinkRpcClientFacade();
-        client.init(host, port);
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
new file mode 100644
index 0000000..7bab666
--- /dev/null
+++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeRpcClientTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.flume;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FlumeRpcClientTest {
+
+    public FlumeRpcClient createGoodClient() {
+        return new FlumeRpcClient("172.25.0.3", 44444);
+    }
+
+
+    @Test
+    public void testInitClientMustFail() {
+        FlumeRpcClient client = new FlumeRpcClient("172.25.0.3", 44445);
+        Assertions.assertThrows(RuntimeException.class, () -> client.init(), "client start");
+    }
+
+    @Test
+    public void testSendStringData() {
+        FlumeRpcClient client = createGoodClient();
+        boolean init = client.init();
+        Assertions.assertTrue(init, "client not start");
+
+        boolean send = client.sendData("xpto");
+        Assertions.assertTrue(send, "data not send");
+
+    }
+
+    @Test
+    public void testSendBytesData() {
+        FlumeRpcClient client = createGoodClient();
+        boolean init = client.init();
+        Assertions.assertTrue(init, "client not start");
+
+        boolean send = client.sendData("xpto".getBytes());
+        Assertions.assertTrue(send, "data not send");
+
+    }
+
+    @Test
+    public void testSendDataWhenConnectionClosed() {
+        FlumeRpcClient client = createGoodClient();
+        boolean init = client.init();
+        Assertions.assertTrue(init, "client not start");
+        client.close();
+
+        boolean send = client.sendData("xpto");
+        Assertions.assertTrue(send, "data not send");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
new file mode 100644
index 0000000..9d87642
--- /dev/null
+++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+
+public class FlumeSinkTest {
+
+
+    @Test
+    public void testSink() throws Exception {
+        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        environment.fromElements("string1", "string2")
+                .addSink(new FlumeSink<>("172.25.0.3", 44444, new SimpleStringSchema()));
+
+        tryExecute(environment, "FlumeTest");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/flink-connector-flume/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-connector-flume/src/test/resources/log4j.properties b/flink-connector-flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..15efe08
--- /dev/null
+++ b/flink-connector-flume/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=WARN, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/72cbe807/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 80766be..c40e194 100644
--- a/pom.xml
+++ b/pom.xml
@@ -708,7 +708,6 @@
         </plugins>
       </build>
     </profile>
-
     <profile>
       <id>test-java-home</id>
       <activation>


[2/3] bahir-flink git commit: Disable Flume tests that dependes on docker

Posted by lr...@apache.org.
Disable Flume tests that dependes on docker


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/f6c63621
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/f6c63621
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/f6c63621

Branch: refs/heads/master
Commit: f6c63621088e269d1b7c2c841cfcdcb54f212d04
Parents: 72cbe80
Author: Luciano Resende <lr...@apache.org>
Authored: Wed Nov 7 12:28:04 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 12:28:04 2018 -0800

----------------------------------------------------------------------
 flink-connector-flume/pom.xml                                 | 7 +++++++
 .../flink/streaming/connectors/flume/FlumeSinkTest.java       | 1 -
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f6c63621/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connector-flume/pom.xml b/flink-connector-flume/pom.xml
index c202c6d..f8be20f 100644
--- a/flink-connector-flume/pom.xml
+++ b/flink-connector-flume/pom.xml
@@ -73,6 +73,13 @@ under the License.
 		<plugins>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<configuration>
+					<skipTests>true</skipTests>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
 				<executions>
 					<execution>

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/f6c63621/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
index 9d87642..60a2b26 100644
--- a/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
+++ b/flink-connector-flume/src/test/java/org/apache/flink/streaming/connectors/flume/FlumeSinkTest.java
@@ -24,7 +24,6 @@ import static org.apache.flink.test.util.TestUtils.tryExecute;
 
 public class FlumeSinkTest {
 
-
     @Test
     public void testSink() throws Exception {
         StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();


[3/3] bahir-flink git commit: [BAHIR-178] Added option to create new InfluxDb database

Posted by lr...@apache.org.
[BAHIR-178] Added option to create new InfluxDb database

Closes #34


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/898e913f
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/898e913f
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/898e913f

Branch: refs/heads/master
Commit: 898e913fe9bdc9837e6f5a9be8d80e41dc1ea020
Parents: f6c6362
Author: Wojciech Luczkow <wo...@comarch.com>
Authored: Tue Sep 18 11:12:37 2018 +0200
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Nov 7 12:28:34 2018 -0800

----------------------------------------------------------------------
 .../connectors/influxdb/InfluxDBConfig.java       | 18 ++++++++++++++++++
 .../connectors/influxdb/InfluxDBSink.java         |  7 ++++++-
 2 files changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/898e913f/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
index 9c1220d..eeafb7a 100644
--- a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBConfig.java
@@ -39,6 +39,7 @@ public class InfluxDBConfig implements Serializable {
     private int flushDuration;
     private TimeUnit flushDurationTimeUnit;
     private boolean enableGzip;
+    private boolean createDatabase;
 
     public InfluxDBConfig(InfluxDBConfig.Builder builder) {
         Preconditions.checkArgument(builder != null, "InfluxDBConfig builder can not be null");
@@ -52,6 +53,7 @@ public class InfluxDBConfig implements Serializable {
         this.flushDuration = builder.getFlushDuration();
         this.flushDurationTimeUnit = builder.getFlushDurationTimeUnit();
         this.enableGzip = builder.isEnableGzip();
+        this.createDatabase = builder.isCreateDatabase();
     }
 
     public String getUrl() {
@@ -86,6 +88,8 @@ public class InfluxDBConfig implements Serializable {
         return enableGzip;
     }
 
+    public boolean isCreateDatabase() { return createDatabase; }
+
     /**
      * Creates a new {@link InfluxDBConfig.Builder} instance.
      * <p/>
@@ -114,6 +118,7 @@ public class InfluxDBConfig implements Serializable {
         private int flushDuration = DEFAULT_FLUSH_DURATION;
         private TimeUnit flushDurationTimeUnit = TimeUnit.MILLISECONDS;
         private boolean enableGzip = false;
+        private boolean createDatabase = false;
 
         /**
          * Creates a builder
@@ -213,6 +218,17 @@ public class InfluxDBConfig implements Serializable {
         }
 
         /**
+         * Make InfluxDb sink create new database
+         *
+         * @param createDatabase createDatabase switch value
+         * @return this Builder to use it fluent
+         */
+        public InfluxDBConfig.Builder createDatabase(boolean createDatabase) {
+            this.createDatabase = createDatabase;
+            return this;
+        }
+
+        /**
          * Builds InfluxDBConfig.
          *
          * @return the InfluxDBConfig instance.
@@ -253,5 +269,7 @@ public class InfluxDBConfig implements Serializable {
         public boolean isEnableGzip() {
             return enableGzip;
         }
+
+        public boolean isCreateDatabase() { return createDatabase; }
     }
 }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/898e913f/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
----------------------------------------------------------------------
diff --git a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
index e7f8916..61de76a 100644
--- a/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
+++ b/flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java
@@ -55,7 +55,12 @@ public class InfluxDBSink extends RichSinkFunction<InfluxDBPoint> {
         influxDBClient = InfluxDBFactory.connect(influxDBConfig.getUrl(), influxDBConfig.getUsername(), influxDBConfig.getPassword());
 
         if (!influxDBClient.databaseExists(influxDBConfig.getDatabase())) {
-            throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!");
+            if(influxDBConfig.isCreateDatabase()) {
+                influxDBClient.createDatabase(influxDBConfig.getDatabase());
+            }
+            else {
+                throw new RuntimeException("This " + influxDBConfig.getDatabase() + " database does not exist!");
+            }
         }
 
         influxDBClient.setDatabase(influxDBConfig.getDatabase());