You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/06 13:58:21 UTC

[4/6] flink git commit: [streaming] Socket Client Sink propagates exceptions

[streaming] Socket Client Sink propagates exceptions

Closes #789


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3402c0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3402c0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3402c0c

Branch: refs/heads/master
Commit: e3402c0caccd6cc3bb85529909e23d85946efc17
Parents: f72e5c8
Author: mbalassi <mb...@apache.org>
Authored: Thu Jun 4 15:22:13 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200

----------------------------------------------------------------------
 .../api/functions/sink/SocketClientSink.java         | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3402c0c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 3fd2678..da8fd7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -24,8 +24,6 @@ import java.net.Socket;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
@@ -35,8 +33,6 @@ import org.slf4j.LoggerFactory;
 public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
 	private final String hostName;
 	private final int port;
 	private final SerializationSchema<IN, byte[]> schema;
@@ -65,7 +61,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 			client = new Socket(hostName, port);
 			outputStream = client.getOutputStream();
 		} catch (IOException e) {
-			throw new RuntimeException(e);
+			throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
 		}
 		dataOutputStream = new DataOutputStream(outputStream);
 	}
@@ -82,11 +78,8 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 		try {
 			dataOutputStream.write(msg);
 		} catch (IOException e) {
-			if(LOG.isErrorEnabled()){
-				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
-			}
-			throw new RuntimeException("Cannot send message \"" + value.toString() +
-					"\" to socket server at " + hostName + ":" + port, e);
+			throw new RuntimeException("Cannot send message " + value.toString() +
+					" to socket server at " + hostName + ":" + port, e);
 		}
 	}
 
@@ -105,7 +98,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 				try {
 					client.close();
 				} catch (IOException e) {
-					LOG.error("Cannot close connection with socket server at "
+					throw new RuntimeException("Cannot close connection with socket server at "
 							+ hostName + ":" + port, e);
 				}
 			}