You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 17:23:21 UTC
[1/2] flink git commit: [FLINK-2536] [streaming] Cleanups and
improvements on SocketClientSink
Repository: flink
Updated Branches:
refs/heads/master 4abdbbb1b -> b9663c407
[FLINK-2536] [streaming] Cleanups and improvements on SocketClientSink
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9663c40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9663c40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9663c40
Branch: refs/heads/master
Commit: b9663c40754f522648f33dd7f56f2a47232c3836
Parents: fd354ba
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 21 16:36:35 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 17:06:43 2015 +0200
----------------------------------------------------------------------
.../api/functions/sink/SocketClientSink.java | 291 ++++++++------
.../functions/sink/SocketClientSinkTest.java | 381 ++++++++++---------
2 files changed, 391 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b9663c40/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 39f1ec0..96bc497 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.functions.sink;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
@@ -25,170 +24,242 @@ import java.net.Socket;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ * <p>
+ * The sink can be set to retry message sends after the sending failed.
+ * <p>
+ * The sink can be set to 'autoflush', in which case the socket stream is flushed after every message. This
+ * significantly reduced throughput, but also decreases message latency.
*
* @param <IN> data to be written into the Socket.
*/
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
- protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+ private static final int CONNECTION_RETRY_DELAY = 500;
+
+
+ private final SerializableObject lock = new SerializableObject();
+ private final SerializationSchema<IN, byte[]> schema;
private final String hostName;
private final int port;
- private final SerializationSchema<IN, byte[]> schema;
+ private final int maxNumRetries;
+ private final boolean autoFlush;
+
private transient Socket client;
- private transient DataOutputStream dataOutputStream;
- private long maxRetry;
- private boolean retryForever;
- private boolean isRunning;
- protected long retries;
- private final SerializableObject lock;
+ private transient OutputStream outputStream;
+
+ private int retries;
- private static final int CONNECTION_RETRY_SLEEP = 1000;
+ private volatile boolean isRunning = true;
+
+ /**
+ * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
+ * and will not auto-flush the stream.
+ *
+ * @param hostName Hostname of the server to connect to.
+ * @param port Port of the server.
+ * @param schema Schema used to serialize the data into bytes.
+ */
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ this(hostName, port, schema, 0);
+ }
/**
- * Default constructor.
+ * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
+ * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
+ * The sink will not auto-flush the stream.
*
- * @param hostName Host of the Socket server.
- * @param port Port of the Socket.
- * @param schema Schema of the data.
+ * @param hostName Hostname of the server to connect to.
+ * @param port Port of the server.
+ * @param schema Schema used to serialize the data into bytes.
+ * @param maxNumRetries The maximum number of retries after a message send failed.
*/
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
- this.hostName = hostName;
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, int maxNumRetries) {
+ this(hostName, port, schema, maxNumRetries, false);
+ }
+
+ /**
+ * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
+ * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
+ *
+ * @param hostName Hostname of the server to connect to.
+ * @param port Port of the server.
+ * @param schema Schema used to serialize the data into bytes.
+ * @param maxNumRetries The maximum number of retries after a message send failed.
+ * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
+ */
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema,
+ int maxNumRetries, boolean autoflush)
+ {
+ checkArgument(port > 0 && port < 65536, "port is out of range");
+ checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+
+ this.hostName = checkNotNull(hostName, "hostname must not be null");
this.port = port;
- this.schema = schema;
- this.maxRetry = maxRetry;
- this.retryForever = maxRetry < 0;
- this.isRunning = false;
- this.retries = 0;
- this.lock = new SerializableObject();
+ this.schema = checkNotNull(schema);
+ this.maxNumRetries = maxNumRetries;
+ this.autoFlush = autoflush;
}
+ // ------------------------------------------------------------------------
+ // Life cycle
+ // ------------------------------------------------------------------------
+
/**
- * Initializes the connection to Socket.
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
*/
- public void intializeConnection() {
- OutputStream outputStream;
+ @Override
+ public void open(Configuration parameters) throws Exception {
try {
- client = new Socket(hostName, port);
- outputStream = client.getOutputStream();
- isRunning = true;
- } catch (IOException e) {
- throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
+ synchronized (lock) {
+ createConnection();
+ }
+ }
+ catch (IOException e) {
+ throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
}
- dataOutputStream = new DataOutputStream(outputStream);
}
-
+
+
/**
* Called when new data arrives to the sink, and forwards it to Socket.
*
- * @param value
- * The incoming data
+ * @param value The value to write to the socket.
*/
@Override
public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
- dataOutputStream.write(msg);
- } catch (IOException e) {
- LOG.error("Cannot send message " + value +
- " to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
- ". Trying to reconnect.", e);
- retries = 0;
- boolean success = false;
- while ((retries < maxRetry || retryForever) && !success && isRunning){
- try {
-
- if (dataOutputStream != null) {
- dataOutputStream.close();
+ outputStream.write(msg);
+ if (autoFlush) {
+ outputStream.flush();
+ }
+ }
+ catch (IOException e) {
+ // if no re-tries are enable, fail immediately
+ if (maxNumRetries == 0) {
+ throw new IOException("Failed to send message '" + value + "' to socket server at "
+ + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
+ }
+
+ LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
+ ". Trying to reconnect..." , e);
+
+ // do the retries in locked scope, to guard against concurrent close() calls
+ // note that the first re-try comes immediately, without a wait!
+
+ synchronized (lock) {
+ IOException lastException = null;
+ retries = 0;
+
+ while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
+
+ // first, clean up the old resources
+ try {
+ if (outputStream != null) {
+ outputStream.close();
+ }
}
-
- if (client != null && !client.isClosed()) {
- client.close();
+ catch (IOException ee) {
+ LOG.error("Could not close output stream from failed write attempt", ee);
}
-
- retries++;
-
- client = new Socket(hostName, port);
- dataOutputStream = new DataOutputStream(client.getOutputStream());
- dataOutputStream.write(msg);
- success = true;
-
- } catch(IOException ee) {
- LOG.error("Reconnect to socket server and send message failed. Caused by " +
- ee.getMessage() + ". Retry time(s):" + retries);
-
try {
- synchronized (lock) {
- lock.wait(CONNECTION_RETRY_SLEEP);
+ if (client != null) {
+ client.close();
}
- } catch(InterruptedException eee) {
- break;
}
+ catch (IOException ee) {
+ LOG.error("Could not close socket from failed write attempt", ee);
+ }
+
+ // try again
+ retries++;
+
+ try {
+ // initialize a new connection
+ createConnection();
+
+ // re-try the write
+ outputStream.write(msg);
+
+ // success!
+ return;
+ }
+ catch (IOException ee) {
+ lastException = ee;
+ LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
+ }
+
+ // wait before re-attempting to connect
+ lock.wait(CONNECTION_RETRY_DELAY);
+ }
+
+ // throw an exception if the task is still running, otherwise simply leave the method
+ if (isRunning) {
+ throw new IOException("Failed to send message '" + value + "' to socket server at "
+ + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
}
- }
- if (!success) {
- throw new RuntimeException("Cannot send message " + value +
- " to socket server at " + hostName + ":" + port, e);
}
}
}
-
+
/**
- * Closes the connection of the Socket client.
+ * Closes the connection with the Socket server.
*/
- private void closeConnection(){
- try {
- isRunning = false;
-
- if (dataOutputStream != null) {
- dataOutputStream.close();
- }
-
- if (client != null && !client.isClosed()) {
- client.close();
- }
-
- if (lock != null) {
- synchronized (lock) {
- lock.notifyAll();
+ @Override
+ public void close() throws Exception {
+ // flag this as not running any more
+ isRunning = false;
+
+ // clean up in locked scope, so there is no concurrent change to the stream and client
+ synchronized (lock) {
+ // we notify first (this statement cannot fail). The notified thread will not continue
+ // anyways before it can re-acquire the lock
+ lock.notifyAll();
+
+ try {
+ if (outputStream != null) {
+ outputStream.close();
}
}
- } catch (IOException e) {
- throw new RuntimeException("Error while closing connection with socket server at "
- + hostName + ":" + port, e);
- } finally {
- if (client != null) {
- try {
+ finally {
+ if (client != null) {
client.close();
- } catch (IOException e) {
- throw new RuntimeException("Cannot close connection with socket server at "
- + hostName + ":" + port, e);
}
}
}
}
- /**
- * Initialize the connection with the Socket in the server.
- * @param parameters Configuration.
- */
- @Override
- public void open(Configuration parameters) {
- intializeConnection();
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private void createConnection() throws IOException {
+ client = new Socket(hostName, port);
+ client.setKeepAlive(true);
+ client.setTcpNoDelay(true);
+
+ outputStream = client.getOutputStream();
}
-
- /**
- * Closes the connection with the Socket server.
- */
- @Override
- public void close() {
- closeConnection();
+
+ // ------------------------------------------------------------------------
+ // For testing
+ // ------------------------------------------------------------------------
+
+ int getCurrentNumberOfRetries() {
+ return retries;
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/b9663c40/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index a9e1b6b..ee3d604 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -18,34 +18,36 @@
package org.apache.flink.streaming.api.functions.sink;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.Socket;
+import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.net.ServerSocket;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
*/
-public class SocketClientSinkTest{
-
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
- private final String host = "127.0.0.1";
- private int port;
- private long retry;
- private String value;
- private Thread t1;
- private ServerSocket server;
- private SocketClientSink<String> simpleSink;
- final private String except = "Cannot send message testSocketSinkInvoke";
+@SuppressWarnings("serial")
+public class SocketClientSinkTest extends TestLogger {
+
+ private static final String TEST_MESSAGE = "testSocketSinkInvoke";
+
+ private static final String EXCEPTION_MESSGAE = "Failed to send message '" + TEST_MESSAGE + "\n'";
+
+ private static final String host = "127.0.0.1";
+
private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
@Override
@@ -53,39 +55,38 @@ public class SocketClientSinkTest{
return element.getBytes();
}
};
-
- public SocketClientSinkTest() {
- }
+
@Test
- public void testSocketSink() throws Exception{
- error.set(null);
- value = "";
- server = new ServerSocket(0);
- port = server.getLocalPort();
+ public void testSocketSink() throws Exception {
+ final ServerSocket server = new ServerSocket(0);
+ final int port = server.getLocalPort();
- new Thread(new Runnable() {
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+ Thread sinkRunner = new Thread("Test sink runner") {
@Override
public void run() {
- t1 = Thread.currentThread();
-
try {
- simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+ SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0);
simpleSink.open(new Configuration());
- simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.invoke(TEST_MESSAGE + '\n');
simpleSink.close();
- } catch (Exception e){
- error.set(e);
+ }
+ catch (Throwable t){
+ error.set(t);
}
}
- }).start();
+ };
+
+ sinkRunner.start();
Socket sk = server.accept();
- BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
- .getInputStream()));
- value = rdr.readLine();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
+
+ String value = rdr.readLine();
- t1.join();
+ sinkRunner.join();
server.close();
if (error.get() != null) {
@@ -94,202 +95,240 @@ public class SocketClientSinkTest{
fail("Error in spawned thread: " + t.getMessage());
}
- assertEquals("testSocketSinkInvoke", value);
+ assertEquals(TEST_MESSAGE, value);
}
@Test
- public void testSocketSinkNoRetry() throws Exception{
- retry = -1L;
- error.set(null);
- server = new ServerSocket(0);
- port = server.getLocalPort();
+ public void testSinkAutoFlush() throws Exception {
+ final ServerSocket server = new ServerSocket(0);
+ final int port = server.getLocalPort();
- t1 = new Thread(new Runnable() {
+ final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
+ simpleSink.open(new Configuration());
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ Thread sinkRunner = new Thread("Test sink runner") {
@Override
public void run() {
try {
- t1 = Thread.currentThread();
- Socket sk = server.accept();
- sk.close();
- server.close();
- } catch (Exception e) {
- error.set(e);
+ // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke(TEST_MESSAGE + '\n');
+ }
+ catch (Throwable t){
+ error.set(t);
}
}
- });
- t1.start();
+ };
- simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+ sinkRunner.start();
- try {
- simpleSink.open(new Configuration());
-
- //wait socket server to close
- t1.join();
- if (error.get() == null) {
-
- //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
- simpleSink.invoke("testSocketSinkInvoke");
-
- //socket is closed then test "retry"
- simpleSink.invoke("testSocketSinkInvoke");
- simpleSink.close();
- }
- } catch (Exception e) {
+ Socket sk = server.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
+ String value = rdr.readLine();
- //check whether throw a exception that reconnect failed.
- retry = simpleSink.retries;
- if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
- error.set(e);
- }
- }
+ sinkRunner.join();
+ simpleSink.close();
+ server.close();
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
- assertEquals(0, retry);
+
+ assertEquals(TEST_MESSAGE, value);
}
@Test
- public void testSocketSinkRetryTenTimes() throws Exception{
- retry = -1L;
- error.set(null);
- server = new ServerSocket(0);
- port = server.getLocalPort();
+ public void testSocketSinkNoRetry() throws Exception {
+ final ServerSocket server = new ServerSocket(0);
+ final int port = server.getLocalPort();
+
+ try {
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+ Thread serverRunner = new Thread("Test server runner") {
+
+ @Override
+ public void run() {
+ try {
+ Socket sk = server.accept();
+ sk.close();
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ serverRunner.start();
+
+ SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
+ simpleSink.open(new Configuration());
+
+ // wait socket server to close
+ serverRunner.join();
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in server thread: " + t.getMessage());
+ }
+
+ try {
+ // socket should be closed, so this should trigger a re-try
+ // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke(TEST_MESSAGE + '\n');
+ simpleSink.invoke(TEST_MESSAGE + '\n');
+ fail("This should have failed with an exception");
+ }
+ catch (IOException e) {
+ // check whether throw a exception that reconnect failed.
+ assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
+ }
+
+ assertEquals(0, simpleSink.getCurrentNumberOfRetries());
+ }
+ finally {
+ IOUtils.closeQuietly(server);
+ }
+ }
- t1 = new Thread(new Runnable() {
+ @Test
+ public void testSocketSinkRetryThreeTimes() throws Exception {
+ final ServerSocket server = new ServerSocket(0);
+ final int port = server.getLocalPort();
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ Thread serverRunner = new Thread("Test server runner") {
@Override
public void run() {
try {
- t1 = Thread.currentThread();
Socket sk = server.accept();
sk.close();
- server.close();
- } catch (Exception e) {
- error.set(e);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ finally {
+ // close the server now to prevent reconnects
+ IOUtils.closeQuietly(server);
}
}
- });
- t1.start();
-
- simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
+ };
- try {
- simpleSink.open(new Configuration());
-
- //wait socket server to close
- t1.join();
- if (error.get() == null) {
-
- //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
- simpleSink.invoke("testSocketSinkInvoke");
-
- //socket is closed then test "retry"
- simpleSink.invoke("testSocketSinkInvoke");
- simpleSink.close();
- }
- } catch (Exception e) {
+ serverRunner.start();
- //check whether throw a exception that reconnect failed.
- retry = simpleSink.retries;
- if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
- error.set(e);
- }
- }
+ SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 3);
+ simpleSink.open(new Configuration());
+ // wait socket server to close
+ serverRunner.join();
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
- fail("Error in spawned thread: " + t.getMessage());
+ fail("Error in server thread: " + t.getMessage());
+ }
+
+ try {
+ // socket should be closed, so this should trigger a re-try
+ // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke(TEST_MESSAGE + '\n');
+ simpleSink.invoke(TEST_MESSAGE + '\n');
}
- assertEquals(10, retry);
+ catch (IOException e) {
+ // check whether throw a exception that reconnect failed.
+ assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
+ }
+ catch (Exception e) {
+ fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
+ }
+
+ assertEquals(3, simpleSink.getCurrentNumberOfRetries());
}
+ /**
+ * This test the reconnect to server success.
+ * First close the server and let the sink get reconnecting.
+ * Meanwhile, reopen the server to let the sink reconnect success to socket.
+ */
@Test
- public void testSocketSinkRetryAccess() throws Exception{
+ public void testSocketSinkRetryAccess() throws Exception {
+ final ServerSocket server1 = new ServerSocket(0);
+ final int port = server1.getLocalPort();
- //This test the reconnect to server success.
- //First close the server and let the sink get reconnecting.
- //Meanwhile, reopen the server to let the sink reconnect success to socket.
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
- retry = -1L;
- error.set(null);
- server = new ServerSocket(0);
- port = server.getLocalPort();
-
- t1 = new Thread(new Runnable() {
+ // start a server, for the sink's open() method to connect against
+ // the server is immediately shut down again
+ Thread serverRunner = new Thread("Test server runner") {
@Override
public void run() {
try {
- t1 = Thread.currentThread();
- Socket sk = server.accept();
+ Socket sk = server1.accept();
sk.close();
- server.close();
- server = null;
- } catch (Exception e) {
- error.set(e);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ finally {
+ IOUtils.closeQuietly(server1);
}
}
- });
- t1.start();
-
- try {
- simpleSink = new SocketClientSink<String>(host, port, simpleSchema, -1);
- simpleSink.open(new Configuration());
-
- t1.join();
- if (error.get() == null) {
+ };
+ serverRunner.start();
- //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
- simpleSink.invoke("testSocketSinkInvoke");
+ final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, -1, true);
+ simpleSink.open(new Configuration());
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- //socket is closed then test "retry"
- simpleSink.invoke("testSocketSinkInvoke");
- simpleSink.close();
- } catch (Exception e) {
- error.set(e);
- }
- }
+ // wait until the server is shut down
+ serverRunner.join();
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in server thread: " + t.getMessage());
+ }
- }).start();
+ // run some data output on the sink. this should fail due to the inactive server, but retry
+ Thread sinkRunner = new Thread("Test sink runner") {
- //set a new server to let the retry success.
- while (simpleSink.retries == 0) {
- Thread.sleep(1000);
+ @Override
+ public void run() {
+ try {
+ // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke(TEST_MESSAGE + '\n');
+ simpleSink.invoke(TEST_MESSAGE + '\n');
+ }
+ catch (Throwable t) {
+ error.set(t);
}
-
- //reopen socket server for sink access
- value = "";
- server = new ServerSocket(port);
- Socket sk = server.accept();
- BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
- .getInputStream()));
- value = rdr.readLine();
-
- retry = simpleSink.retries;
}
-
- } catch (Exception e) {
- error.set(e);
- }
-
+ };
+ sinkRunner.start();
+
+ // we start another server now, which will make the sink complete its task
+ ServerSocket server2 = new ServerSocket(port);
+ Socket sk = server2.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
+ String value = rdr.readLine();
+ int retry = simpleSink.getCurrentNumberOfRetries();
+
+ // let the sink finish
+ sinkRunner.join();
+
+ // make sure that the sink did not throw an error
if (error.get() != null) {
Throwable t = error.get();
t.printStackTrace();
fail("Error in spawned thread: " + t.getMessage());
}
- assertEquals("testSocketSinkInvoke", value);
+ // validate state and results
+ assertEquals(TEST_MESSAGE, value);
assertTrue(retry > 0);
}
}
\ No newline at end of file
[2/2] flink git commit: [FLINK-2536] [streaming] Add a re-connect
attempty to socket sink
Posted by se...@apache.org.
[FLINK-2536] [streaming] Add a re-connect attempty to socket sink
This closes #1030
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd354ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd354ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd354ba4
Branch: refs/heads/master
Commit: fd354ba4dbd2a76688992b0a092ccc78b61f1088
Parents: 4abdbbb
Author: HuangWHWHW <40...@qq.com>
Authored: Thu Sep 10 11:23:31 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 17:06:43 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 2 +-
.../api/functions/sink/SocketClientSink.java | 83 +++++-
.../functions/sink/SocketClientSinkTest.java | 295 +++++++++++++++++++
3 files changed, 372 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d7cc1f5..d92498c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1028,7 +1028,7 @@ public class DataStream<T> {
* @return the closed DataStream
*/
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
- DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema));
+ DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index adffe5e..39f1ec0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -23,7 +23,10 @@ import java.io.OutputStream;
import java.net.Socket;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
@@ -31,6 +34,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
* @param <IN> data to be written into the Socket.
*/
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+ protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
private static final long serialVersionUID = 1L;
private final String hostName;
@@ -38,6 +43,13 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
private final SerializationSchema<IN, byte[]> schema;
private transient Socket client;
private transient DataOutputStream dataOutputStream;
+ private long maxRetry;
+ private boolean retryForever;
+ private boolean isRunning;
+ protected long retries;
+ private final SerializableObject lock;
+
+ private static final int CONNECTION_RETRY_SLEEP = 1000;
/**
* Default constructor.
@@ -46,10 +58,15 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
* @param port Port of the Socket.
* @param schema Schema of the data.
*/
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
this.hostName = hostName;
this.port = port;
this.schema = schema;
+ this.maxRetry = maxRetry;
+ this.retryForever = maxRetry < 0;
+ this.isRunning = false;
+ this.retries = 0;
+ this.lock = new SerializableObject();
}
/**
@@ -60,6 +77,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
try {
client = new Socket(hostName, port);
outputStream = client.getOutputStream();
+ isRunning = true;
} catch (IOException e) {
throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
}
@@ -73,13 +91,51 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
* The incoming data
*/
@Override
- public void invoke(IN value) {
+ public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
- throw new RuntimeException("Cannot send message " + value.toString() +
- " to socket server at " + hostName + ":" + port, e);
+ LOG.error("Cannot send message " + value +
+ " to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
+ ". Trying to reconnect.", e);
+ retries = 0;
+ boolean success = false;
+ while ((retries < maxRetry || retryForever) && !success && isRunning){
+ try {
+
+ if (dataOutputStream != null) {
+ dataOutputStream.close();
+ }
+
+ if (client != null && !client.isClosed()) {
+ client.close();
+ }
+
+ retries++;
+
+ client = new Socket(hostName, port);
+ dataOutputStream = new DataOutputStream(client.getOutputStream());
+ dataOutputStream.write(msg);
+ success = true;
+
+ } catch(IOException ee) {
+ LOG.error("Reconnect to socket server and send message failed. Caused by " +
+ ee.getMessage() + ". Retry time(s):" + retries);
+
+ try {
+ synchronized (lock) {
+ lock.wait(CONNECTION_RETRY_SLEEP);
+ }
+ } catch(InterruptedException eee) {
+ break;
+ }
+ }
+ }
+ if (!success) {
+ throw new RuntimeException("Cannot send message " + value +
+ " to socket server at " + hostName + ":" + port, e);
+ }
}
}
@@ -88,8 +144,21 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
*/
private void closeConnection(){
try {
- dataOutputStream.close();
- client.close();
+ isRunning = false;
+
+ if (dataOutputStream != null) {
+ dataOutputStream.close();
+ }
+
+ if (client != null && !client.isClosed()) {
+ client.close();
+ }
+
+ if (lock != null) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
} catch (IOException e) {
throw new RuntimeException("Error while closing connection with socket server at "
+ hostName + ":" + port, e);
@@ -122,4 +191,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
closeConnection();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
new file mode 100644
index 0000000..a9e1b6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ private final String host = "127.0.0.1";
+ private int port;
+ private long retry;
+ private String value;
+ private Thread t1;
+ private ServerSocket server;
+ private SocketClientSink<String> simpleSink;
+ final private String except = "Cannot send message testSocketSinkInvoke";
+
+ private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes();
+ }
+ };
+
+ public SocketClientSinkTest() {
+ }
+
+ @Test
+ public void testSocketSink() throws Exception{
+ error.set(null);
+ value = "";
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ t1 = Thread.currentThread();
+
+ try {
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+ simpleSink.open(new Configuration());
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ } catch (Exception e){
+ error.set(e);
+ }
+ }
+ }).start();
+
+ Socket sk = server.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+ .getInputStream()));
+ value = rdr.readLine();
+
+ t1.join();
+ server.close();
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals("testSocketSinkInvoke", value);
+ }
+
+ @Test
+ public void testSocketSinkNoRetry() throws Exception{
+ retry = -1L;
+ error.set(null);
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ t1 = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ t1 = Thread.currentThread();
+ Socket sk = server.accept();
+ sk.close();
+ server.close();
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+ });
+ t1.start();
+
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+
+ try {
+ simpleSink.open(new Configuration());
+
+ //wait socket server to close
+ t1.join();
+ if (error.get() == null) {
+
+ //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke("testSocketSinkInvoke");
+
+ //socket is closed then test "retry"
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ }
+ } catch (Exception e) {
+
+ //check whether throw a exception that reconnect failed.
+ retry = simpleSink.retries;
+ if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+ error.set(e);
+ }
+ }
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+ assertEquals(0, retry);
+ }
+
+ @Test
+ public void testSocketSinkRetryTenTimes() throws Exception{
+ retry = -1L;
+ error.set(null);
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ t1 = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ t1 = Thread.currentThread();
+ Socket sk = server.accept();
+ sk.close();
+ server.close();
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+ });
+ t1.start();
+
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
+
+ try {
+ simpleSink.open(new Configuration());
+
+ //wait socket server to close
+ t1.join();
+ if (error.get() == null) {
+
+ //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke("testSocketSinkInvoke");
+
+ //socket is closed then test "retry"
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ }
+ } catch (Exception e) {
+
+ //check whether throw a exception that reconnect failed.
+ retry = simpleSink.retries;
+ if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+ error.set(e);
+ }
+ }
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+ assertEquals(10, retry);
+ }
+
+ @Test
+ public void testSocketSinkRetryAccess() throws Exception{
+
+ //This test the reconnect to server success.
+ //First close the server and let the sink get reconnecting.
+ //Meanwhile, reopen the server to let the sink reconnect success to socket.
+
+ retry = -1L;
+ error.set(null);
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ t1 = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ t1 = Thread.currentThread();
+ Socket sk = server.accept();
+ sk.close();
+ server.close();
+ server = null;
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+ });
+ t1.start();
+
+ try {
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, -1);
+ simpleSink.open(new Configuration());
+
+ t1.join();
+ if (error.get() == null) {
+
+ //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke("testSocketSinkInvoke");
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ //socket is closed then test "retry"
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+
+ }).start();
+
+ //set a new server to let the retry success.
+ while (simpleSink.retries == 0) {
+ Thread.sleep(1000);
+ }
+
+ //reopen socket server for sink access
+ value = "";
+ server = new ServerSocket(port);
+ Socket sk = server.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+ .getInputStream()));
+ value = rdr.readLine();
+
+ retry = simpleSink.retries;
+ }
+
+ } catch (Exception e) {
+ error.set(e);
+ }
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals("testSocketSinkInvoke", value);
+ assertTrue(retry > 0);
+ }
+}
\ No newline at end of file