You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/06 13:58:21 UTC
[4/6] flink git commit: [streaming] Socket Client Sink propagates
exceptions
[streaming] Socket Client Sink propagates exceptions
Closes #789
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3402c0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3402c0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3402c0c
Branch: refs/heads/master
Commit: e3402c0caccd6cc3bb85529909e23d85946efc17
Parents: f72e5c8
Author: mbalassi <mb...@apache.org>
Authored: Thu Jun 4 15:22:13 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200
----------------------------------------------------------------------
.../api/functions/sink/SocketClientSink.java | 15 ++++-----------
1 file changed, 4 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e3402c0c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 3fd2678..da8fd7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -24,8 +24,6 @@ import java.net.Socket;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
@@ -35,8 +33,6 @@ import org.slf4j.LoggerFactory;
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
private final String hostName;
private final int port;
private final SerializationSchema<IN, byte[]> schema;
@@ -65,7 +61,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
client = new Socket(hostName, port);
outputStream = client.getOutputStream();
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
}
dataOutputStream = new DataOutputStream(outputStream);
}
@@ -82,11 +78,8 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
try {
dataOutputStream.write(msg);
} catch (IOException e) {
- if(LOG.isErrorEnabled()){
- LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
- }
- throw new RuntimeException("Cannot send message \"" + value.toString() +
- "\" to socket server at " + hostName + ":" + port, e);
+ throw new RuntimeException("Cannot send message " + value.toString() +
+ " to socket server at " + hostName + ":" + port, e);
}
}
@@ -105,7 +98,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
try {
client.close();
} catch (IOException e) {
- LOG.error("Cannot close connection with socket server at "
+ throw new RuntimeException("Cannot close connection with socket server at "
+ hostName + ":" + port, e);
}
}