You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:57 UTC
[24/28] git commit: [streaming] connectors logging and deploy update
[streaming] connectors logging and deploy update
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cad38ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cad38ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cad38ede
Branch: refs/heads/master
Commit: cad38edef98f26f16f68592c0a5b4b654b446f57
Parents: 0b72112
Author: jfeher <fe...@gmail.com>
Authored: Mon Aug 25 16:15:43 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../flink-streaming-connectors/pom.xml | 16 ++++++++++++
.../connectors/flume/FlumeTopology.java | 18 ++++++++++++--
.../connectors/kafka/KafkaTopology.java | 17 +++++++++++--
.../connectors/rabbitmq/RMQTopology.java | 26 ++++++++++++++++----
4 files changed, 68 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index c18d877..af4b56b 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -107,6 +107,22 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index dfb8250..38ea6ef 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -18,11 +18,14 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class FlumeTopology {
-
+ private static final Log LOG = LogFactory.getLog(FlumeTopology.class);
public static class MyFlumeSink extends FlumeSink<String> {
private static final long serialVersionUID = 1L;
@@ -45,6 +48,17 @@ public class FlumeTopology {
}
+ public static final class MyFlumePrintSink implements SinkFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(String value) {
+ LOG.info("String: <" + value + "> arrived from Flume");
+
+ }
+
+ }
+
public static class MyFlumeSource extends FlumeSource<String> {
private static final long serialVersionUID = 1L;
@@ -70,7 +84,7 @@ public class FlumeTopology {
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
- .print();
+ .addSink(new MyFlumePrintSink());
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index ef03f0d..2c89471 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -18,13 +18,17 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class KafkaTopology {
-
+ private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+
public static final class MySource implements SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@@ -72,6 +76,15 @@ public class KafkaTopology {
}
}
+
+ public static final class MyKafkaPrintSink implements SinkFunction<Tuple1<String>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple1<String> value) {
+ LOG.info("String: " + value + " arrived from Kafka");
+ }
+ }
private static final int SOURCE_PARALELISM = 1;
@@ -82,7 +95,7 @@ public class KafkaTopology {
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream1 = env
.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
- .print();
+ .addSink(new MyKafkaPrintSink());
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream2 = env
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index b7083ff..6cdda17 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -17,12 +17,17 @@
package org.apache.flink.streaming.connectors.rabbitmq;
-import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
public class RMQTopology {
-
+ private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+
public static final class MyRMQSink extends RMQSink<String> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
@@ -39,7 +44,18 @@ public class RMQTopology {
}
}
-
+
+ public static final class MyRMQPrintSink implements SinkFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(String value) {
+ LOG.info("String: <" + value + "> arrived from RMQ");
+
+ }
+
+ }
+
public static final class MyRMQSource extends RMQSource<String> {
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
@@ -66,7 +82,7 @@ public class RMQTopology {
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
- .print();
+ .addSink(new MyRMQPrintSink());
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env