You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:03:57 UTC

[24/28] git commit: [streaming] connectors logging and deploy update

[streaming] connectors logging and deploy update


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cad38ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cad38ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cad38ede

Branch: refs/heads/master
Commit: cad38edef98f26f16f68592c0a5b4b654b446f57
Parents: 0b72112
Author: jfeher <fe...@gmail.com>
Authored: Mon Aug 25 16:15:43 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          | 16 ++++++++++++
 .../connectors/flume/FlumeTopology.java         | 18 ++++++++++++--
 .../connectors/kafka/KafkaTopology.java         | 17 +++++++++++--
 .../connectors/rabbitmq/RMQTopology.java        | 26 ++++++++++++++++----
 4 files changed, 68 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index c18d877..af4b56b 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -107,6 +107,22 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			<plugin>
+            	<artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+	                <execution>
+    	                <phase>package</phase>
+                        <goals>
+        	                <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+            	    <descriptorRefs>
+                		<descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+            	</configuration>
+        	</plugin>		
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index dfb8250..38ea6ef 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -18,11 +18,14 @@
 package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public class FlumeTopology {
-
+	private static final Log LOG = LogFactory.getLog(FlumeTopology.class);
 	public static class MyFlumeSink extends FlumeSink<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -45,6 +48,17 @@ public class FlumeTopology {
 
 	}
 
+	public static final class MyFlumePrintSink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String value) {
+			LOG.info("String: <" + value + "> arrived from Flume");
+			
+		}
+		
+	}
+	
 	public static class MyFlumeSource extends FlumeSource<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -70,7 +84,7 @@ public class FlumeTopology {
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream1 = env
 			.addSource(new MyFlumeSource("localhost", 41414))
-			.print();
+			.addSink(new MyFlumePrintSink());
 
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream2 = env

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index ef03f0d..2c89471 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -18,13 +18,17 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public class KafkaTopology {
-
+	private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+	
 	public static final class MySource implements SourceFunction<Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
@@ -72,6 +76,15 @@ public class KafkaTopology {
 		}
 
 	}
+	
+	public static final class MyKafkaPrintSink implements SinkFunction<Tuple1<String>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple1<String> value) {
+			LOG.info("String: " + value + " arrived from Kafka");
+		}
+	}
 
 	private static final int SOURCE_PARALELISM = 1;
 
@@ -82,7 +95,7 @@ public class KafkaTopology {
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> stream1 = env
 			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
-			.print();
+			.addSink(new MyKafkaPrintSink());
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> stream2 = env

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cad38ede/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index b7083ff..6cdda17 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -17,12 +17,17 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
-import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
 
 public class RMQTopology {
-
+	private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
+	
 	public static final class MyRMQSink extends RMQSink<String> {
 		public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
 			super(HOST_NAME, QUEUE_NAME);
@@ -39,7 +44,18 @@ public class RMQTopology {
 		}
 
 	}
-
+
+	public static final class MyRMQPrintSink implements SinkFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(String value) {
+			LOG.info("String: <" + value + "> arrived from RMQ");
+			
+		}
+		
+	}
+	
 	public static final class MyRMQSource extends RMQSource<String> {
 
 		public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
@@ -66,7 +82,7 @@ public class RMQTopology {
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream1 = env
 			.addSource(new MyRMQSource("localhost", "hello"))
-			.print();
+			.addSink(new MyRMQPrintSink());
 
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream2 = env