You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:56 UTC
[19/51] [abbrv] git commit: [streaming] connectors logging and error
handling fixed
[streaming] connectors logging and error handling fixed
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7c5bc3c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7c5bc3c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7c5bc3c5
Branch: refs/heads/master
Commit: 7c5bc3c5f5323de4a73aaa8f6864a441e64a00ef
Parents: a06f9d1
Author: jfeher <fe...@gmail.com>
Authored: Tue Jul 29 16:20:12 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200
----------------------------------------------------------------------
.../streaming/connectors/flume/FlumeSink.java | 29 +++++++++------
.../streaming/connectors/flume/FlumeSource.java | 39 ++++++++++----------
.../connectors/flume/FlumeTopology.java | 7 ++--
.../streaming/connectors/kafka/KafkaSink.java | 3 +-
.../connectors/kafka/KafkaTopology.java | 16 +++-----
.../streaming/connectors/rabbitmq/RMQSink.java | 12 +++++-
.../connectors/rabbitmq/RMQSource.java | 24 ++++++------
.../connectors/rabbitmq/RMQTopology.java | 5 +--
8 files changed, 72 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 27ce546..3379fcb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -19,19 +19,23 @@
package org.apache.flink.streaming.connectors.flume;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
-
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(RMQSource.class);
+
private transient FlinkRpcClientFacade client;
boolean initDone = false;
String host;
@@ -53,7 +57,7 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
}
byte[] data = serialize(tuple);
- if(!closeWithoutSend){
+ if (!closeWithoutSend) {
client.sendDataToFlume(data);
}
if (sendAndClose) {
@@ -74,22 +78,23 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
this.hostname = hostname;
this.port = port;
int initCounter = 0;
- while(true){
- if(initCounter >= 90){
+ while (true) {
+ if (initCounter >= 90) {
System.exit(1);
}
- try{
+ try {
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- }
- catch(FlumeException e){
+ } catch (FlumeException e) {
// Wait one second if the connection failed before the next try
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
- e1.printStackTrace();
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Interrupted while trying to connect " + port + " at " + host);
+ }
}
}
- if(client != null){
+ if (client != null) {
break;
}
initCounter++;
@@ -120,10 +125,10 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
public void sendAndClose() {
sendAndClose = true;
}
-
- public void closeWithoutSend(){
+
+ public void closeWithoutSend() {
client.close();
- closeWithoutSend=true;
+ closeWithoutSend = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 966c881..5b90e14 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -36,15 +36,15 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
String host;
String port;
-
- FlumeSource(String host, int port){
+
+ FlumeSource(String host, int port) {
this.host = host;
this.port = Integer.toString(port);
}
-
+
public class MyAvroSource extends AvroSource {
Collector<IN> collector;
-
+
@Override
public Status append(AvroFlumeEvent avroEvent) {
collect(avroEvent);
@@ -59,30 +59,31 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
return Status.OK;
}
-
- private void collect(AvroFlumeEvent avroEvent){
+
+ private void collect(AvroFlumeEvent avroEvent) {
byte[] b = avroEvent.getBody().array();
IN tuple = FlumeSource.this.deserialize(b);
- if(!closeWithoutSend){
+ if (!closeWithoutSend) {
collector.collect(tuple);
}
- if(sendAndClose){
+ if (sendAndClose) {
sendDone = true;
}
}
-
+
}
MyAvroSource avroSource;
private volatile boolean closeWithoutSend = false;
private boolean sendAndClose = false;
private volatile boolean sendDone = false;
+
public abstract IN deserialize(byte[] msg);
-
- public void configureAvroSource(Collector<IN> collector){
+
+ public void configureAvroSource(Collector<IN> collector) {
avroSource = new MyAvroSource();
- avroSource.collector=collector;
+ avroSource.collector = collector;
Context context = new Context();
context.put("port", port);
context.put("bind", host);
@@ -90,24 +91,24 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
ChannelProcessor cp = new ChannelProcessor(null);
avroSource.setChannelProcessor(cp);
}
-
+
@Override
public void invoke(Collector<IN> collector) throws Exception {
configureAvroSource(collector);
avroSource.start();
- while(true){
- if(closeWithoutSend || sendDone){
+ while (true) {
+ if (closeWithoutSend || sendDone) {
break;
}
}
avroSource.stop();
}
-
- public void sendAndClose(){
+
+ public void sendAndClose() {
sendAndClose = true;
}
-
- public void closeWithoutSend(){
+
+ public void closeWithoutSend() {
closeWithoutSend = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index e08837a..779a5fb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -20,7 +20,6 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.lang.SerializationUtils;
-
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.DataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -40,7 +39,8 @@ public class FlumeTopology {
try {
sendAndClose();
} catch (Exception e) {
- e.printStackTrace();
+ new RuntimeException("Error while closing RMQ connection with " + port + " at "
+ + host, e);
}
}
return SerializationUtils.serialize((String) tuple.getField(0));
@@ -70,8 +70,7 @@ public class FlumeTopology {
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(1);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<Tuple1<String>> dataStream1 = env
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index e97cdd0..44867ef 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -64,8 +64,7 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
}
OUT out = serialize(tuple);
- KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(
- topicId, out);
+ KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
if (!closeWithoutSend) {
producer.send(data);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index e111feb..d605fb8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -31,8 +31,7 @@ public class KafkaTopology {
private static final long serialVersionUID = 1L;
@Override
- public void invoke(Collector<Tuple1<String>> collector)
- throws Exception {
+ public void invoke(Collector<Tuple1<String>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
collector.collect(new Tuple1<String>(Integer.toString(i)));
}
@@ -44,8 +43,7 @@ public class KafkaTopology {
public static final class MyKafkaSource extends KafkaSource<Tuple1<String>> {
private static final long serialVersionUID = 1L;
- public MyKafkaSource(String zkQuorum, String groupId, String topicId,
- int numThreads) {
+ public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
super(zkQuorum, groupId, topicId, numThreads);
}
@@ -60,8 +58,7 @@ public class KafkaTopology {
}
- public static final class MyKafkaSink extends
- KafkaSink<Tuple1<String>, String> {
+ public static final class MyKafkaSink extends KafkaSink<Tuple1<String>, String> {
private static final long serialVersionUID = 1L;
public MyKafkaSink(String topicId, String brokerAddr) {
@@ -82,19 +79,18 @@ public class KafkaTopology {
public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(1);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream1 = env
- .addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
+ .addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream2 = env
.addSource(new MySource())
.addSink(new MyKafkaSink("test", "localhost:9092"));
-
+
env.execute();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 5440ea7..663fc13 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -25,11 +25,16 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
private static final long serialVersionUID = 1L;
+
+ private static final Log LOG = LogFactory.getLog(RMQSource.class);
+
private boolean sendAndClose = false;
private boolean closeWithoutSend = false;
@@ -72,7 +77,9 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
channel.basicPublish("", QUEUE_NAME, null, msg);
}
} catch (IOException e) {
- e.printStackTrace();
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot send RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+ }
}
if (sendAndClose) {
@@ -87,7 +94,8 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
channel.close();
connection.close();
} catch (IOException e) {
- e.printStackTrace();
+ new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
+ + HOST_NAME, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 345d884..4fd2235 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -21,12 +21,13 @@ package org.apache.flink.streaming.connectors.rabbitmq;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
-import com.rabbitmq.client.ShutdownSignalException;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.function.source.SourceFunction;
@@ -35,6 +36,8 @@ import org.apache.flink.util.Collector;
public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(RMQSource.class);
+
private final String QUEUE_NAME;
private final String HOST_NAME;
private boolean closeWithoutSend = false;
@@ -63,6 +66,8 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException e) {
+ new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
+ + HOST_NAME, e);
}
}
@@ -74,14 +79,10 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
try {
delivery = consumer.nextDelivery();
- } catch (ShutdownSignalException e) {
- e.printStackTrace();
- break;
- } catch (ConsumerCancelledException e) {
- e.printStackTrace();
- break;
- } catch (InterruptedException e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot receive RMQ message " + QUEUE_NAME + " at " + HOST_NAME);
+ }
}
outTuple = deserialize(delivery.getBody());
@@ -98,7 +99,8 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
try {
connection.close();
} catch (IOException e) {
- e.printStackTrace();
+ new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
+ + HOST_NAME, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c5bc3c5/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index f84505b..828c2fa 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -68,8 +68,7 @@ public class RMQTopology {
public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(1);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
DataStream<Tuple1<String>> dataStream1 = env
@@ -78,7 +77,7 @@ public class RMQTopology {
@SuppressWarnings("unused")
DataStream<Tuple1<String>> dataStream2 = env
- .fromElements("one", "two", "three", "four", "five", "q")
+ .fromElements("one", "two", "three", "four", "five", "q")
.addSink(new MyRMQSink("localhost", "hello"));
env.execute();