You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:56 UTC

[19/51] [abbrv] git commit: [streaming] connectors logging and error handling fixed

[streaming] connectors logging and error handling fixed


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7c5bc3c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7c5bc3c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7c5bc3c5

Branch: refs/heads/master
Commit: 7c5bc3c5f5323de4a73aaa8f6864a441e64a00ef
Parents: a06f9d1
Author: jfeher <fe...@gmail.com>
Authored: Tue Jul 29 16:20:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   | 29 +++++++++------
 .../streaming/connectors/flume/FlumeSource.java | 39 ++++++++++----------
 .../connectors/flume/FlumeTopology.java         |  7 ++--
 .../streaming/connectors/kafka/KafkaSink.java   |  3 +-
 .../connectors/kafka/KafkaTopology.java         | 16 +++-----
 .../streaming/connectors/rabbitmq/RMQSink.java  | 12 +++++-
 .../connectors/rabbitmq/RMQSource.java          | 24 ++++++------
 .../connectors/rabbitmq/RMQTopology.java        |  5 +--
 8 files changed, 72 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 27ce546..3379fcb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -19,19 +19,23 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
-
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
 
 public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
+	private static final Log LOG = LogFactory.getLog(RMQSource.class);
+	
 	private transient FlinkRpcClientFacade client;
 	boolean initDone = false;
 	String host;
@@ -53,7 +57,7 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 		}
 
 		byte[] data = serialize(tuple);
-		if(!closeWithoutSend){
+		if (!closeWithoutSend) {
 			client.sendDataToFlume(data);
 		}
 		if (sendAndClose) {
@@ -74,22 +78,23 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 			this.hostname = hostname;
 			this.port = port;
 			int initCounter = 0;
-			while(true){
-				if(initCounter >= 90){
+			while (true) {
+				if (initCounter >= 90) {
 					System.exit(1);
 				}
-				try{
+				try {
 					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
-				}
-				catch(FlumeException e){
+				} catch (FlumeException e) {
 					// Wait one second if the connection failed before the next try
 					try {
 						Thread.sleep(1000);
 					} catch (InterruptedException e1) {
-						e1.printStackTrace();
+						if (LOG.isErrorEnabled()) {
+							LOG.error("Interrupted while trying to connect " + port + " at " + host);
+						}
 					}
 				}
-				if(client != null){
+				if (client != null) {
 					break;
 				}
 				initCounter++;
@@ -120,10 +125,10 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 	public void sendAndClose() {
 		sendAndClose = true;
 	}
-	
-	public void closeWithoutSend(){
+
+	public void closeWithoutSend() {
 		client.close();
-		closeWithoutSend=true;
+		closeWithoutSend = true;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 966c881..5b90e14 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -36,15 +36,15 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 
 	String host;
 	String port;
-	
-	FlumeSource(String host, int port){
+
+	FlumeSource(String host, int port) {
 		this.host = host;
 		this.port = Integer.toString(port);
 	}
-	
+
 	public class MyAvroSource extends AvroSource {
 		Collector<IN> collector;
-		
+
 		@Override
 		public Status append(AvroFlumeEvent avroEvent) {
 			collect(avroEvent);
@@ -59,30 +59,31 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 
 			return Status.OK;
 		}
-		
-		private void collect(AvroFlumeEvent avroEvent){
+
+		private void collect(AvroFlumeEvent avroEvent) {
 			byte[] b = avroEvent.getBody().array();
 			IN tuple = FlumeSource.this.deserialize(b);
-			if(!closeWithoutSend){
+			if (!closeWithoutSend) {
 				collector.collect(tuple);
 			}
-			if(sendAndClose){
+			if (sendAndClose) {
 				sendDone = true;
 			}
 		}
-		
+
 	}
 
 	MyAvroSource avroSource;
 	private volatile boolean closeWithoutSend = false;
 	private boolean sendAndClose = false;
 	private volatile boolean sendDone = false;
+
 	public abstract IN deserialize(byte[] msg);
-	
-	public void configureAvroSource(Collector<IN> collector){
+
+	public void configureAvroSource(Collector<IN> collector) {
 
 		avroSource = new MyAvroSource();
-		avroSource.collector=collector;
+		avroSource.collector = collector;
 		Context context = new Context();
 		context.put("port", port);
 		context.put("bind", host);
@@ -90,24 +91,24 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 		ChannelProcessor cp = new ChannelProcessor(null);
 		avroSource.setChannelProcessor(cp);
 	}
-	
+
 	@Override
 	public void invoke(Collector<IN> collector) throws Exception {
 		configureAvroSource(collector);
 		avroSource.start();
-		while(true){
-			if(closeWithoutSend || sendDone){
+		while (true) {
+			if (closeWithoutSend || sendDone) {
 				break;
 			}
 		}
 		avroSource.stop();
 	}
-	
-	public void sendAndClose(){
+
+	public void sendAndClose() {
 		sendAndClose = true;
 	}
-	
-	public void closeWithoutSend(){
+
+	public void closeWithoutSend() {
 		closeWithoutSend = true;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index e08837a..779a5fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -20,7 +20,6 @@
 package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.commons.lang.SerializationUtils;
-
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -40,7 +39,8 @@ public class FlumeTopology {
 				try {
 					sendAndClose();
 				} catch (Exception e) {
-					e.printStackTrace();
+					new RuntimeException("Error while closing RMQ connection with " + port + " at "
+							+ host, e);
 				}
 			}
 			return SerializationUtils.serialize((String) tuple.getField(0));
@@ -70,8 +70,7 @@ public class FlumeTopology {
 
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> dataStream1 = env

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index e97cdd0..44867ef 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -64,8 +64,7 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
 		}
 
 		OUT out = serialize(tuple);
-		KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(
-				topicId, out);
+		KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
 
 		if (!closeWithoutSend) {
 			producer.send(data);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index e111feb..d605fb8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -31,8 +31,7 @@ public class KafkaTopology {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<String>> collector)
-				throws Exception {
+		public void invoke(Collector<Tuple1<String>> collector) throws Exception {
 			for (int i = 0; i < 10; i++) {
 				collector.collect(new Tuple1<String>(Integer.toString(i)));
 			}
@@ -44,8 +43,7 @@ public class KafkaTopology {
 	public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
 		private static final long serialVersionUID = 1L;
 
-		public MyKafkaSource(String zkQuorum, String groupId, String topicId,
-				int numThreads) {
+		public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
 			super(zkQuorum, groupId, topicId, numThreads);
 		}
 
@@ -60,8 +58,7 @@ public class KafkaTopology {
 
 	}
 
-	public static final class MyKafkaSink extends
-			KafkaSink<Tuple1<String>, String> {
+	public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
 		private static final long serialVersionUID = 1L;
 
 		public MyKafkaSink(String topicId, String brokerAddr) {
@@ -82,19 +79,18 @@ public class KafkaTopology {
 
 	public static void main(String[] args) {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> stream1 = env
-			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1),	SOURCE_PARALELISM)
+			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
 			.print();
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> stream2 = env
 			.addSource(new MySource())
 			.addSink(new MyKafkaSink("test", "localhost:9092"));
-		
+
 		env.execute();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 5440ea7..663fc13 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -25,11 +25,16 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
+
+	private static final Log LOG = LogFactory.getLog(RMQSource.class);
+
 	private boolean sendAndClose = false;
 	private boolean closeWithoutSend = false;
 
@@ -72,7 +77,9 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 				channel.basicPublish("", QUEUE_NAME, null, msg);
 			}
 		} catch (IOException e) {
-			e.printStackTrace();
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Cannot send RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+			}
 		}
 
 		if (sendAndClose) {
@@ -87,7 +94,8 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 			channel.close();
 			connection.close();
 		} catch (IOException e) {
-			e.printStackTrace();
+			new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
+					+ HOST_NAME, e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 345d884..4fd2235 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -21,12 +21,13 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.ConsumerCancelledException;
 import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownSignalException;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
@@ -35,6 +36,8 @@ import org.apache.flink.util.Collector;
 public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
+	private static final Log LOG = LogFactory.getLog(RMQSource.class);
+
 	private final String QUEUE_NAME;
 	private final String HOST_NAME;
 	private boolean closeWithoutSend = false;
@@ -63,6 +66,8 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 			consumer = new QueueingConsumer(channel);
 			channel.basicConsume(QUEUE_NAME, true, consumer);
 		} catch (IOException e) {
+			new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
+					+ HOST_NAME, e);
 		}
 	}
 
@@ -74,14 +79,10 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 
 			try {
 				delivery = consumer.nextDelivery();
-			} catch (ShutdownSignalException e) {
-				e.printStackTrace();
-				break;
-			} catch (ConsumerCancelledException e) {
-				e.printStackTrace();
-				break;
-			} catch (InterruptedException e) {
-				e.printStackTrace();
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+				}
 			}
 
 			outTuple = deserialize(delivery.getBody());
@@ -98,7 +99,8 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 		try {
 			connection.close();
 		} catch (IOException e) {
-			e.printStackTrace();
+			new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
+					+ HOST_NAME, e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index f84505b..828c2fa 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -68,8 +68,7 @@ public class RMQTopology {
 
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> dataStream1 = env
@@ -78,7 +77,7 @@ public class RMQTopology {
 
 		@SuppressWarnings("unused")
 		DataStream<Tuple1<String>> dataStream2 = env
-			.fromElements("one", "two",	"three", "four", "five", "q")
+			.fromElements("one", "two", "three", "four", "five", "q")
 			.addSink(new MyRMQSink("localhost", "hello"));
 
 		env.execute();