You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:59 UTC

[22/51] [abbrv] git commit: [streaming] Added Flume connector and updated connectors

[streaming] Added Flume connector and updated connectors


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a06f9d10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a06f9d10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a06f9d10

Branch: refs/heads/master
Commit: a06f9d10e6ea9f91dc6723af483ba9328d4e1621
Parents: 7ca778b
Author: jfeher <fe...@gmail.com>
Authored: Tue Jul 29 13:51:34 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          |  11 +-
 .../streaming/connectors/flume/FlumeSink.java   | 129 +++++++++++++++++++
 .../streaming/connectors/flume/FlumeSource.java | 114 ++++++++++++++++
 .../connectors/flume/FlumeTopology.java         |  88 +++++++++++++
 .../connectors/kafka/KafkaProducer.java         |  75 -----------
 .../streaming/connectors/kafka/KafkaSink.java   |  24 +++-
 .../streaming/connectors/kafka/KafkaSource.java |  34 ++---
 .../connectors/kafka/KafkaTopology.java         |  38 +++---
 .../streaming/connectors/rabbitmq/RMQSink.java  |  74 +++++------
 .../connectors/rabbitmq/RMQSource.java          |  52 ++++----
 .../connectors/rabbitmq/RMQTopology.java        |  29 ++---
 11 files changed, 472 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
index ac15d55..8db610f 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/pom.xml
@@ -48,14 +48,19 @@ under the License.
 			<version>0.8.0</version>
 		</dependency>
 
-
-	    <dependency>
+		<dependency>
 			<groupId>com.rabbitmq</groupId>
 			<artifactId>amqp-client</artifactId>
 			<version>3.3.1</version>
 		</dependency>
 
-	    <dependency>
+		<dependency>
+			<groupId>org.apache.flume</groupId>
+			<artifactId>flume-ng-core</artifactId>
+			<version>1.5.0</version>
+		</dependency>
+
+	    	<dependency>
 			<groupId>com.twitter</groupId>
 			<artifactId>hbc-core</artifactId>
 			<version>2.2.0</version>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..27ce546
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,129 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+
+public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private transient FlinkRpcClientFacade client;
+	boolean initDone = false;
+	String host;
+	int port;
+	private boolean sendAndClose = false;
+	private boolean closeWithoutSend = false;
+
+	public FlumeSink(String host, int port) {
+		this.host = host;
+		this.port = port;
+	}
+
+	@Override
+	public void invoke(IN tuple) {
+
+		if (!initDone) {
+			client = new FlinkRpcClientFacade();
+			client.init(host, port);
+		}
+
+		byte[] data = serialize(tuple);
+		if(!closeWithoutSend){
+			client.sendDataToFlume(data);
+		}
+		if (sendAndClose) {
+			client.close();
+		}
+
+	}
+
+	public abstract byte[] serialize(IN tuple);
+
+	private class FlinkRpcClientFacade {
+		private RpcClient client;
+		private String hostname;
+		private int port;
+
+		public void init(String hostname, int port) {
+			// Setup the RPC connection
+			this.hostname = hostname;
+			this.port = port;
+			int initCounter = 0;
+			while(true){
+				if(initCounter >= 90){
+					System.exit(1);
+				}
+				try{
+					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+				}
+				catch(FlumeException e){
+					// Wait one second if the connection failed before the next try
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e1) {
+						e1.printStackTrace();
+					}
+				}
+				if(client != null){
+					break;
+				}
+				initCounter++;
+			}
+			initDone = true;
+		}
+
+		public void sendDataToFlume(byte[] data) {
+			Event event = EventBuilder.withBody(data);
+
+			try {
+				client.append(event);
+
+			} catch (EventDeliveryException e) {
+				// clean up and recreate the client
+				client.close();
+				client = null;
+				client = RpcClientFactory.getDefaultInstance(hostname, port);
+			}
+		}
+
+		public void close() {
+			client.close();
+		}
+
+	}
+
+	public void sendAndClose() {
+		sendAndClose = true;
+	}
+	
+	public void closeWithoutSend(){
+		client.close();
+		closeWithoutSend=true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..966c881
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AvroSource;
+import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.flume.source.avro.Status;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	String host;
+	String port;
+	
+	FlumeSource(String host, int port){
+		this.host = host;
+		this.port = Integer.toString(port);
+	}
+	
+	public class MyAvroSource extends AvroSource {
+		Collector<IN> collector;
+		
+		@Override
+		public Status append(AvroFlumeEvent avroEvent) {
+			collect(avroEvent);
+			return Status.OK;
+		}
+
+		@Override
+		public Status appendBatch(List<AvroFlumeEvent> events) {
+			for (AvroFlumeEvent avroEvent : events) {
+				collect(avroEvent);
+			}
+
+			return Status.OK;
+		}
+		
+		private void collect(AvroFlumeEvent avroEvent){
+			byte[] b = avroEvent.getBody().array();
+			IN tuple = FlumeSource.this.deserialize(b);
+			if(!closeWithoutSend){
+				collector.collect(tuple);
+			}
+			if(sendAndClose){
+				sendDone = true;
+			}
+		}
+		
+	}
+
+	MyAvroSource avroSource;
+	private volatile boolean closeWithoutSend = false;
+	private boolean sendAndClose = false;
+	private volatile boolean sendDone = false;
+	public abstract IN deserialize(byte[] msg);
+	
+	public void configureAvroSource(Collector<IN> collector){
+
+		avroSource = new MyAvroSource();
+		avroSource.collector=collector;
+		Context context = new Context();
+		context.put("port", port);
+		context.put("bind", host);
+		avroSource.configure(context);
+		ChannelProcessor cp = new ChannelProcessor(null);
+		avroSource.setChannelProcessor(cp);
+	}
+	
+	@Override
+	public void invoke(Collector<IN> collector) throws Exception {
+		configureAvroSource(collector);
+		avroSource.start();
+		while(true){
+			if(closeWithoutSend || sendDone){
+				break;
+			}
+		}
+		avroSource.stop();
+	}
+	
+	public void sendAndClose(){
+		sendAndClose = true;
+	}
+	
+	public void closeWithoutSend(){
+		closeWithoutSend = true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..e08837a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.commons.lang.SerializationUtils;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+
+public class FlumeTopology {
+
+	public static class MyFlumeSink extends FlumeSink<Tuple1<String>> {
+		private static final long serialVersionUID = 1L;
+
+		public MyFlumeSink(String host, int port) {
+			super(host, port);
+		}
+
+		@Override
+		public byte[] serialize(Tuple1<String> tuple) {
+			if (tuple.f0.equals("q")) {
+				try {
+					sendAndClose();
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+			}
+			return SerializationUtils.serialize((String) tuple.getField(0));
+		}
+
+	}
+
+	public static class MyFlumeSource extends FlumeSource<Tuple1<String>> {
+		private static final long serialVersionUID = 1L;
+
+		MyFlumeSource(String host, int port) {
+			super(host, port);
+		}
+
+		@Override
+		public Tuple1<String> deserialize(byte[] msg) {
+			String s = (String) SerializationUtils.deserialize(msg);
+			Tuple1<String> out = new Tuple1<String>();
+			out.f0 = s;
+			if (s.equals("q")) {
+				closeWithoutSend();
+			}
+			return out;
+		}
+
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(1);
+
+		@SuppressWarnings("unused")
+		DataStream<Tuple1<String>> dataStream1 = env
+			.addSource(new MyFlumeSource("localhost", 41414))
+			.print();
+
+		@SuppressWarnings("unused")
+		DataStream<Tuple1<String>> dataStream2 = env
+			.fromElements("one", "two", "three", "four", "five", "q")
+			.addSink(new MyFlumeSink("localhost", 42424));
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java
deleted file mode 100644
index 00a8964..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.util.Properties;
-
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-/**
- * This is a simple kafka producer that reads local file from disk and produces
- * line streams. To use the producer, a zookeeper server and a kafka server
- * should be in service. Run the following script to start a zookeeper server:
- * bin/zookeeper-server-start.sh config/zookeeper.properties Run the following
- * script to start a kafka server: bin/kafka-server-start.sh
- * config/server.properties Run the following script to start the producer: java
- * -cp kafka-0.8/libs/*:yourJarFile.jar
- * org.apache.flink.streaming.kafka.KafkaProducer yourTopicID kafkaServerIp As
- * an example: java -cp kafka-0.8/libs/*:flink-streaming.jar
- * org.apache.flink.streaming.kafka.KafkaProducer test localhost:9092
- */
-public class KafkaProducer {
-	static kafka.javaapi.producer.Producer<Integer, String> producer;
-	static Properties props = new Properties();
-
-	public static void ProducerPrepare(String brokerAddr) {
-		props.put("serializer.class", "kafka.serializer.StringEncoder");
-		props.put("metadata.broker.list", brokerAddr);
-
-		producer = new kafka.javaapi.producer.Producer<Integer, String>(
-				new ProducerConfig(props));
-	}
-
-	public static void main(String[] args) throws Exception {
-		if (args.length == 3) {
-			String infilename = args[0];
-			String topicId = args[1];
-			String brokerAddr = args[2];
-			ProducerPrepare(brokerAddr);
-			BufferedReader reader = new BufferedReader(new FileReader(
-					infilename));
-			while (true) {
-				String line = reader.readLine();
-				if (line == null) {
-					reader.close();
-					reader = new BufferedReader(new FileReader(infilename));
-					continue;
-				}
-				producer.send(new KeyedMessage<Integer, String>(topicId, line));
-			}
-		} else {
-			System.out.println("please set filename!");
-			System.exit(-1);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index eeac961..e97cdd0 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -35,7 +35,8 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
 	static Properties props;
 	private String topicId;
 	private String brokerAddr;
-	private boolean close = false;
+	private boolean sendAndClose = false;
+	private boolean closeWithoutSend = false;
 	private boolean initDone = false;
 
 	public KafkaSink(String topicId, String brokerAddr) {
@@ -63,18 +64,27 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
 		}
 
 		OUT out = serialize(tuple);
-		KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
-		producer.send(data);
-		if (close) {
-			producer.close();
+		KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(
+				topicId, out);
+
+		if (!closeWithoutSend) {
+			producer.send(data);
 		}
 
+		if (sendAndClose) {
+			producer.close();
+		}
 	}
 
 	public abstract OUT serialize(IN tuple);
 
-	public void close() {
-		close = true;
+	public void closeWithoutSend() {
+		producer.close();
+		closeWithoutSend = true;
+	}
+
+	public void sendAndClose() {
+		sendAndClose = true;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index d34b6c3..32b11d9 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -24,21 +24,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
-
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * Source for reading messages from a Kafka queue. 
- * The source currently only support string messages.
+ * Source for reading messages from a Kafka queue. The source currently only
+ * support string messages.
  */
-public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<IN>{
+public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private final String zkQuorum;
@@ -46,7 +44,8 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
 	private final String topicId;
 	private final int numThreads;
 	private ConsumerConnector consumer;
-	private boolean close = false;
+	private boolean closeWithoutSend = false;
+	private boolean sendAndClose = false;
 
 	IN outTuple;
 
@@ -81,20 +80,25 @@ public abstract class KafkaSource<IN extends Tuple, OUT> extends SourceFunction<
 		ConsumerIterator<byte[], byte[]> it = stream.iterator();
 
 		while (it.hasNext()) {
-			IN out=deserialize(it.next().message());
-			if(!close){
-				collector.collect(out);
+			IN out = deserialize(it.next().message());
+			if (closeWithoutSend) {
+				break;
 			}
-			else {
+			collector.collect(out);
+			if (sendAndClose) {
 				break;
 			}
 		}
 		consumer.shutdown();
 	}
-	
+
 	public abstract IN deserialize(byte[] msg);
-	
-	public void close(){
-		close=true;
+
+	public void closeWithoutSend() {
+		closeWithoutSend = true;
+	}
+
+	public void sendAndClose() {
+		sendAndClose = true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 1a77aee..e111feb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -19,10 +19,10 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
 public class KafkaTopology {
@@ -31,8 +31,8 @@ public class KafkaTopology {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
-			// TODO Auto-generated method stub
+		public void invoke(Collector<Tuple1<String>> collector)
+				throws Exception {
 			for (int i = 0; i < 10; i++) {
 				collector.collect(new Tuple1<String>(Integer.toString(i)));
 			}
@@ -41,39 +41,37 @@ public class KafkaTopology {
 		}
 	}
 
-	public static final class MyKafkaSource extends KafkaSource<Tuple1<String>, String> {
+	public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
-		public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
+		public MyKafkaSource(String zkQuorum, String groupId, String topicId,
+				int numThreads) {
 			super(zkQuorum, groupId, topicId, numThreads);
-			// TODO Auto-generated constructor stub
 		}
 
 		@Override
 		public Tuple1<String> deserialize(byte[] msg) {
-			// TODO Auto-generated method stub
 			String s = new String(msg);
 			if (s.equals("q")) {
-				close();
+				closeWithoutSend();
 			}
 			return new Tuple1<String>(s);
 		}
 
 	}
 
-	public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
+	public static final class MyKafkaSink extends
+			KafkaSink<Tuple1<String>, String> {
 		private static final long serialVersionUID = 1L;
 
 		public MyKafkaSink(String topicId, String brokerAddr) {
 			super(topicId, brokerAddr);
-			// TODO Auto-generated constructor stub
 		}
 
 		@Override
 		public String serialize(Tuple1<String> tuple) {
-			// TODO Auto-generated method stub
 			if (tuple.f0.equals("q")) {
-				close();
+				sendAndClose();
 			}
 			return tuple.f0;
 		}
@@ -84,15 +82,19 @@ public class KafkaTopology {
 
 	public static void main(String[] args) {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> stream1 = env.addSource(
-				new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM).print();
+		DataStream<Tuple1<String>> stream1 = env
+			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1),	SOURCE_PARALELISM)
+			.print();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> stream2 = env.addSource(new MySource()).addSink(
-				new MyKafkaSink("test", "localhost:9092"));
+		DataStream<Tuple1<String>> stream2 = env
+			.addSource(new MySource())
+			.addSink(new MyKafkaSink("test", "localhost:9092"));
+		
 		env.execute();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index e6e8de5..5440ea7 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,47 +21,39 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-/**
- * Source for sending messages to a RabbitMQ queue. The source currently only
- * support string messages. Other types will be added soon.
- * 
- */
-public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
-	private static final Log LOG = LogFactory.getLog(RMQSink.class);
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
+public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
-	private boolean close = false;
+	private boolean sendAndClose = false;
+	private boolean closeWithoutSend = false;
 
-	private String queueName;
-	private String hostName;
+	private String QUEUE_NAME;
+	private String HOST_NAME;
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
 	private transient Channel channel;
 	private boolean initDone = false;
 
-	public RMQSink(String hostName, String queueName) {
-		this.hostName = hostName;
-		this.queueName = queueName;
+	public RMQSink(String HOST_NAME, String QUEUE_NAME) {
+		this.HOST_NAME = HOST_NAME;
+		this.QUEUE_NAME = QUEUE_NAME;
 	}
 
 	public void initializeConnection() {
 		factory = new ConnectionFactory();
-		factory.setHost(hostName);
+		factory.setHost(HOST_NAME);
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();
+
 		} catch (IOException e) {
-			new RuntimeException("Cannot create RMQ connection with " + queueName + " at "
-					+ hostName, e);
+			e.printStackTrace();
 		}
 
 		initDone = true;
@@ -74,31 +66,39 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 		}
 
 		try {
-			channel.queueDeclare(queueName, false, false, false, null);
+			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 			byte[] msg = serialize(tuple);
-			channel.basicPublish("", queueName, null, msg);
-		} catch (IOException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send RMQ message to " + queueName + " at " + hostName);
+			if (!closeWithoutSend) {
+				channel.basicPublish("", QUEUE_NAME, null, msg);
 			}
+		} catch (IOException e) {
+			e.printStackTrace();
 		}
 
-		if (close) {
-			try {
-				channel.close();
-				connection.close();
-			} catch (IOException e) {
-				if (LOG.isWarnEnabled()) {
-					LOG.warn("Cannot close RMQ connection: " + queueName + " at " + hostName);
-				}
-			}
+		if (sendAndClose) {
+			closeChannel();
 		}
 	}
 
 	public abstract byte[] serialize(Tuple t);
 
-	public void close() {
-		close = true;
+	private void closeChannel() {
+		try {
+			channel.close();
+			connection.close();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	public void closeWithoutSend() {
+		closeChannel();
+		closeWithoutSend = true;
+	}
+
+	public void sendAndClose() {
+		sendAndClose = true;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4b197e3..345d884 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -21,31 +21,24 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.ConsumerCancelledException;
 import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownSignalException;
 
-/**
- * Source for reading messages from a RabbitMQ queue. The source currently only
- * support string messages. Other types will be added soon.
- * 
- */
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
 
 public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
-	private static final Log LOG = LogFactory.getLog(RMQSource.class);
-
 	private static final long serialVersionUID = 1L;
 
 	private final String QUEUE_NAME;
 	private final String HOST_NAME;
-	private boolean close = false;
+	private boolean closeWithoutSend = false;
+	private boolean sendAndClose = false;
 
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
@@ -70,8 +63,6 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 			consumer = new QueueingConsumer(channel);
 			channel.basicConsume(QUEUE_NAME, true, consumer);
 		} catch (IOException e) {
-			new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
-					+ HOST_NAME, e);
 		}
 	}
 
@@ -79,20 +70,29 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 	public void invoke(Collector<IN> collector) throws Exception {
 		initializeConnection();
 
-		while (!close) {
+		while (true) {
 
 			try {
 				delivery = consumer.nextDelivery();
-			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
-				}
+			} catch (ShutdownSignalException e) {
+				e.printStackTrace();
+				break;
+			} catch (ConsumerCancelledException e) {
+				e.printStackTrace();
+				break;
+			} catch (InterruptedException e) {
+				e.printStackTrace();
 			}
 
 			outTuple = deserialize(delivery.getBody());
-			if (!close){
+			if (closeWithoutSend) {
+				break;
+			} else {
 				collector.collect(outTuple);
 			}
+			if (sendAndClose) {
+				break;
+			}
 		}
 
 		try {
@@ -105,8 +105,12 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 
 	public abstract IN deserialize(byte[] t);
 
-	public void close() {
-		close = true;
+	public void closeWithoutSend() {
+		closeWithoutSend = true;
+	}
+
+	public void sendAndClose() {
+		sendAndClose = true;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a06f9d10/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 4b7c5dc..f84505b 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,10 +19,8 @@
 
 package org.apache.flink.streaming.connectors.rabbitmq;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.commons.lang.SerializationUtils;
+
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
@@ -33,16 +31,14 @@ public class RMQTopology {
 	public static final class MyRMQSink extends RMQSink<Tuple1<String>> {
 		public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
 			super(HOST_NAME, QUEUE_NAME);
-			// TODO Auto-generated constructor stub
 		}
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public byte[] serialize(Tuple t) {
-			// TODO Auto-generated method stub
 			if (t.getField(0).equals("q")) {
-				close();
+				sendAndClose();
 			}
 			return SerializationUtils.serialize((String) t.getField(0));
 		}
@@ -53,7 +49,6 @@ public class RMQTopology {
 
 		public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
 			super(HOST_NAME, QUEUE_NAME);
-			// TODO Auto-generated constructor stub
 		}
 
 		private static final long serialVersionUID = 1L;
@@ -64,28 +59,28 @@ public class RMQTopology {
 			Tuple1<String> out = new Tuple1<String>();
 			out.f0 = s;
 			if (s.equals("q")) {
-				close();
+				closeWithoutSend();
 			}
 			return out;
 		}
 
 	}
 
-	@SuppressWarnings("unused")
-	private static Set<String> result = new HashSet<String>();
-
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> dataStream1 = env.addSource(
-				new MyRMQSource("localhost", "hello")).print();
+		DataStream<Tuple1<String>> dataStream1 = env
+			.addSource(new MyRMQSource("localhost", "hello"))
+			.print();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> dataStream2 = env.fromElements("one", "two", "three", "four",
-				"five", "q").addSink(new MyRMQSink("localhost", "hello"));
+		DataStream<Tuple1<String>> dataStream2 = env
+			.fromElements("one", "two",	"three", "four", "five", "q")
+			.addSink(new MyRMQSink("localhost", "hello"));
 
 		env.execute();
 	}
-}
\ No newline at end of file
+}