You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 17:23:21 UTC

[1/2] flink git commit: [FLINK-2536] [streaming] Cleanups and improvements on SocketClientSink

Repository: flink
Updated Branches:
  refs/heads/master 4abdbbb1b -> b9663c407


[FLINK-2536] [streaming] Cleanups and improvements on SocketClientSink


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9663c40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9663c40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9663c40

Branch: refs/heads/master
Commit: b9663c40754f522648f33dd7f56f2a47232c3836
Parents: fd354ba
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 21 16:36:35 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 17:06:43 2015 +0200

----------------------------------------------------------------------
 .../api/functions/sink/SocketClientSink.java    | 291 ++++++++------
 .../functions/sink/SocketClientSinkTest.java    | 381 ++++++++++---------
 2 files changed, 391 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9663c40/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 39f1ec0..96bc497 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.sink;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
@@ -25,170 +24,242 @@ import java.net.Socket;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ * <p>
+ * The sink can be set to retry message sends after the sending failed.
+ * <p>
+ * The sink can be set to 'autoflush', in which case the socket stream is flushed after every message. This
+ * significantly reduced throughput, but also decreases message latency.
  *
  * @param <IN> data to be written into the Socket.
  */
 public class SocketClientSink<IN> extends RichSinkFunction<IN> {
-	protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
 
 	private static final long serialVersionUID = 1L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
 
+	private static final int CONNECTION_RETRY_DELAY = 500;
+	
+	
+	private final SerializableObject lock = new SerializableObject();
+	private final SerializationSchema<IN, byte[]> schema;
 	private final String hostName;
 	private final int port;
-	private final SerializationSchema<IN, byte[]> schema;
+	private final int maxNumRetries;
+	private final boolean autoFlush;
+	
 	private transient Socket client;
-	private transient DataOutputStream dataOutputStream;
-	private long maxRetry;
-	private boolean retryForever;
-	private boolean isRunning;
-	protected long retries;
-	private final SerializableObject lock;
+	private transient OutputStream outputStream;
+	
+	private int retries;
 
-	private static final int CONNECTION_RETRY_SLEEP = 1000;
+	private volatile boolean isRunning = true;
+	
+	/**
+	 * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
+	 * and will not auto-flush the stream.
+	 *
+	 * @param hostName Hostname of the server to connect to.
+	 * @param port Port of the server.
+	 * @param schema Schema used to serialize the data into bytes.
+	 */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+		this(hostName, port, schema, 0);
+	}
 
 	/**
-	 * Default constructor.
+	 * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
+	 * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
+	 * The sink will not auto-flush the stream.
 	 *
-	 * @param hostName Host of the Socket server.
-	 * @param port Port of the Socket.
-	 * @param schema Schema of the data.
+	 * @param hostName Hostname of the server to connect to.
+	 * @param port Port of the server.
+	 * @param schema Schema used to serialize the data into bytes.
+	 * @param maxNumRetries The maximum number of retries after a message send failed.
 	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
-		this.hostName = hostName;
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, int maxNumRetries) {
+		this(hostName, port, schema, maxNumRetries, false);
+	}
+
+	/**
+	 * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
+	 * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
+	 *
+	 * @param hostName Hostname of the server to connect to.
+	 * @param port Port of the server.
+	 * @param schema Schema used to serialize the data into bytes.
+	 * @param maxNumRetries The maximum number of retries after a message send failed.
+	 * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
+	 */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema,
+							int maxNumRetries, boolean autoflush)
+	{
+		checkArgument(port > 0 && port < 65536, "port is out of range");
+		checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");
+
+		this.hostName = checkNotNull(hostName, "hostname must not be null");
 		this.port = port;
-		this.schema = schema;
-		this.maxRetry = maxRetry;
-		this.retryForever = maxRetry < 0;
-		this.isRunning = false;
-		this.retries = 0;
-		this.lock = new SerializableObject();
+		this.schema = checkNotNull(schema);
+		this.maxNumRetries = maxNumRetries;
+		this.autoFlush = autoflush;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Life cycle
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Initializes the connection to Socket.
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
 	 */
-	public void intializeConnection() {
-		OutputStream outputStream;
+	@Override
+	public void open(Configuration parameters) throws Exception {
 		try {
-			client = new Socket(hostName, port);
-			outputStream = client.getOutputStream();
-			isRunning = true;
-		} catch (IOException e) {
-			throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
+			synchronized (lock) {
+				createConnection();
+			}
+		}
+		catch (IOException e) {
+			throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
 		}
-		dataOutputStream = new DataOutputStream(outputStream);
 	}
-
+	
+	
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Socket.
 	 *
-	 * @param value
-	 *			The incoming data
+	 * @param value The value to write to the socket.
 	 */
 	@Override
 	public void invoke(IN value) throws Exception {
 		byte[] msg = schema.serialize(value);
 		try {
-			dataOutputStream.write(msg);
-		} catch (IOException e) {
-			LOG.error("Cannot send message " + value +
-					" to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
-					". Trying to reconnect.", e);
-			retries = 0;
-			boolean success = false;
-			while ((retries < maxRetry || retryForever) && !success && isRunning){
-				try {
-
-					if (dataOutputStream != null) {
-						dataOutputStream.close();
+			outputStream.write(msg);
+			if (autoFlush) {
+				outputStream.flush();
+			}
+		}
+		catch (IOException e) {
+			// if no re-tries are enable, fail immediately
+			if (maxNumRetries == 0) {
+				throw new IOException("Failed to send message '" + value + "' to socket server at "
+						+ hostName + ":" + port + ". Connection re-tries are not enabled.", e);
+			}
+			
+			LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port + 
+					". Trying to reconnect..." , e);
+			
+			// do the retries in locked scope, to guard against concurrent close() calls
+			// note that the first re-try comes immediately, without a wait!
+		
+			synchronized (lock) {
+				IOException lastException = null;
+				retries = 0;
+				
+				while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {
+					
+					// first, clean up the old resources
+					try {
+						if (outputStream != null) {
+							outputStream.close();
+						}
 					}
-
-					if (client != null && !client.isClosed()) {
-						client.close();
+					catch (IOException ee) {
+						LOG.error("Could not close output stream from failed write attempt", ee);
 					}
-
-					retries++;
-
-					client = new Socket(hostName, port);
-					dataOutputStream = new DataOutputStream(client.getOutputStream());
-					dataOutputStream.write(msg);
-					success = true;
-
-				} catch(IOException ee) {
-					LOG.error("Reconnect to socket server and send message failed. Caused by " +
-							ee.getMessage() + ". Retry time(s):" + retries);
-
 					try {
-						synchronized (lock) {
-							lock.wait(CONNECTION_RETRY_SLEEP);
+						if (client != null) {
+							client.close();
 						}
-					} catch(InterruptedException eee) {
-						break;
 					}
+					catch (IOException ee) {
+						LOG.error("Could not close socket from failed write attempt", ee);
+					}
+					
+					// try again
+					retries++;
+					
+					try {
+						// initialize a new connection
+						createConnection();
+						
+						// re-try the write
+						outputStream.write(msg);
+						
+						// success!
+						return;
+					}
+					catch (IOException ee) {
+						lastException = ee;
+						LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
+					}
+
+					// wait before re-attempting to connect
+					lock.wait(CONNECTION_RETRY_DELAY);
+				}
+				
+				// throw an exception if the task is still running, otherwise simply leave the method
+				if (isRunning) {
+					throw new IOException("Failed to send message '" + value + "' to socket server at "
+							+ hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
 				}
-			}
-			if (!success) {
-				throw new RuntimeException("Cannot send message " + value +
-						" to socket server at " + hostName + ":" + port, e);
 			}
 		}
 	}
-
+	
 	/**
-	 * Closes the connection of the Socket client.
+	 * Closes the connection with the Socket server.
 	 */
-	private void closeConnection(){
-		try {
-			isRunning = false;
-
-			if (dataOutputStream != null) {
-				dataOutputStream.close();
-			}
-
-			if (client != null && !client.isClosed()) {
-				client.close();
-			}
-
-			if (lock != null) {
-				synchronized (lock) {
-					lock.notifyAll();
+	@Override
+	public void close() throws Exception {
+		// flag this as not running any more
+		isRunning = false;
+		
+		// clean up in locked scope, so there is no concurrent change to the stream and client
+		synchronized (lock) {
+			// we notify first (this statement cannot fail). The notified thread will not continue
+			// anyways before it can re-acquire the lock
+			lock.notifyAll();
+			
+			try {
+				if (outputStream != null) {
+					outputStream.close();
 				}
 			}
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing connection with socket server at "
-					+ hostName + ":" + port, e);
-		} finally {
-			if (client != null) {
-				try {
+			finally {
+				if (client != null) {
 					client.close();
-				} catch (IOException e) {
-					throw new RuntimeException("Cannot close connection with socket server at "
-							+ hostName + ":" + port, e);
 				}
 			}
 		}
 	}
 
-	/**
-	 * Initialize the connection with the Socket in the server.
-	 * @param parameters Configuration.
-	 */
-	@Override
-	public void open(Configuration parameters) {
-		intializeConnection();
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private void createConnection() throws IOException {
+		client = new Socket(hostName, port);
+		client.setKeepAlive(true);
+		client.setTcpNoDelay(true);
+		
+		outputStream = client.getOutputStream();
 	}
-
-	/**
-	 * Closes the connection with the Socket server.
-	 */
-	@Override
-	public void close() {
-		closeConnection();
+	
+	// ------------------------------------------------------------------------
+	//  For testing
+	// ------------------------------------------------------------------------
+	
+	int getCurrentNumberOfRetries() {
+		return retries;
 	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b9663c40/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index a9e1b6b..ee3d604 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -18,34 +18,36 @@
 
 package org.apache.flink.streaming.api.functions.sink;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.Socket;
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.net.ServerSocket;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
  */
-public class SocketClientSinkTest{
-
-	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-	private final String host = "127.0.0.1";
-	private int port;
-	private long retry;
-	private String value;
-	private Thread t1;
-	private ServerSocket server;
-	private SocketClientSink<String> simpleSink;
-	final private String except = "Cannot send message testSocketSinkInvoke";
+@SuppressWarnings("serial")
+public class SocketClientSinkTest extends TestLogger {
+
+	private static final String TEST_MESSAGE = "testSocketSinkInvoke";
+	
+	private static final String EXCEPTION_MESSGAE = "Failed to send message '" + TEST_MESSAGE + "\n'";
+
+	private static final String host = "127.0.0.1";
+	
 
 	private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
 		@Override
@@ -53,39 +55,38 @@ public class SocketClientSinkTest{
 			return element.getBytes();
 		}
 	};
-
-	public SocketClientSinkTest() {
-	}
+	
 
 	@Test
-	public void testSocketSink() throws Exception{
-		error.set(null);
-		value = "";
-		server = new ServerSocket(0);
-		port = server.getLocalPort();
+	public void testSocketSink() throws Exception {
+		final ServerSocket server = new ServerSocket(0);
+		final int port = server.getLocalPort();
 
-		new Thread(new Runnable() {
+		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+		
+		Thread sinkRunner = new Thread("Test sink runner") {
 			@Override
 			public void run() {
-				t1 = Thread.currentThread();
-
 				try {
-					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+					SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0);
 					simpleSink.open(new Configuration());
-					simpleSink.invoke("testSocketSinkInvoke");
+					simpleSink.invoke(TEST_MESSAGE + '\n');
 					simpleSink.close();
-				} catch (Exception e){
-					error.set(e);
+				}
+				catch (Throwable t){
+					error.set(t);
 				}
 			}
-		}).start();
+		};
+		
+		sinkRunner.start();
 
 		Socket sk = server.accept();
-		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
-				.getInputStream()));
-		value = rdr.readLine();
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
+		
+		String value = rdr.readLine();
 
-		t1.join();
+		sinkRunner.join();
 		server.close();
 
 		if (error.get() != null) {
@@ -94,202 +95,240 @@ public class SocketClientSinkTest{
 			fail("Error in spawned thread: " + t.getMessage());
 		}
 
-		assertEquals("testSocketSinkInvoke", value);
+		assertEquals(TEST_MESSAGE, value);
 	}
 
 	@Test
-	public void testSocketSinkNoRetry() throws Exception{
-		retry = -1L;
-		error.set(null);
-		server = new ServerSocket(0);
-		port = server.getLocalPort();
+	public void testSinkAutoFlush() throws Exception {
+		final ServerSocket server = new ServerSocket(0);
+		final int port = server.getLocalPort();
 
-		t1 = new Thread(new Runnable() {
+		final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
+		simpleSink.open(new Configuration());
+		
+		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 
+		Thread sinkRunner = new Thread("Test sink runner") {
 			@Override
 			public void run() {
 				try {
-					t1 = Thread.currentThread();
-					Socket sk = server.accept();
-					sk.close();
-					server.close();
-				} catch (Exception e) {
-					error.set(e);
+					// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+					simpleSink.invoke(TEST_MESSAGE + '\n');
+				}
+				catch (Throwable t){
+					error.set(t);
 				}
 			}
-		});
-		t1.start();
+		};
 
-		simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+		sinkRunner.start();
 
-		try {
-			simpleSink.open(new Configuration());
-
-			//wait socket server to close
-			t1.join();
-			if (error.get() == null) {
-
-				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
-				simpleSink.invoke("testSocketSinkInvoke");
-
-				//socket is closed then test "retry"
-				simpleSink.invoke("testSocketSinkInvoke");
-				simpleSink.close();
-			}
-		} catch (Exception e) {
+		Socket sk = server.accept();
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
+		String value = rdr.readLine();
 
-			//check whether throw a exception that reconnect failed.
-			retry = simpleSink.retries;
-			if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
-				error.set(e);
-			}
-		}
+		sinkRunner.join();
+		simpleSink.close();
+		server.close();
 
 		if (error.get() != null) {
 			Throwable t = error.get();
 			t.printStackTrace();
 			fail("Error in spawned thread: " + t.getMessage());
 		}
-		assertEquals(0, retry);
+
+		assertEquals(TEST_MESSAGE, value);
 	}
 
 	@Test
-	public void testSocketSinkRetryTenTimes() throws Exception{
-		retry = -1L;
-		error.set(null);
-		server = new ServerSocket(0);
-		port = server.getLocalPort();
+	public void testSocketSinkNoRetry() throws Exception {
+		final ServerSocket server = new ServerSocket(0);
+		final int port = server.getLocalPort();
+		
+		try {
+			final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+	
+			Thread serverRunner = new Thread("Test server runner") {
+	
+				@Override
+				public void run() {
+					try {
+						Socket sk = server.accept();
+						sk.close();
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			serverRunner.start();
+	
+			SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0, true);
+			simpleSink.open(new Configuration());
+	
+			// wait socket server to close
+			serverRunner.join();
+			if (error.get() != null) {
+				Throwable t = error.get();
+				t.printStackTrace();
+				fail("Error in server thread: " + t.getMessage());
+			}
+			
+			try {
+				// socket should be closed, so this should trigger a re-try
+				// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke(TEST_MESSAGE + '\n');
+				simpleSink.invoke(TEST_MESSAGE + '\n');
+				fail("This should have failed with an exception");
+			}
+			catch (IOException e) {
+				// check whether throw a exception that reconnect failed.
+				assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
+			}
+			catch (Exception e) {
+				fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
+			}
+			
+			assertEquals(0, simpleSink.getCurrentNumberOfRetries());
+		}
+		finally {
+			IOUtils.closeQuietly(server);
+		}
+	}
 
-		t1 = new Thread(new Runnable() {
+	@Test
+	public void testSocketSinkRetryThreeTimes() throws Exception {
+		final ServerSocket server = new ServerSocket(0);
+		final int port = server.getLocalPort();
+		
+		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 
+		Thread serverRunner = new Thread("Test server runner") {
 			@Override
 			public void run() {
 				try {
-					t1 = Thread.currentThread();
 					Socket sk = server.accept();
 					sk.close();
-					server.close();
-				} catch (Exception e) {
-					error.set(e);
+				}
+				catch (Throwable t) {
+					error.set(t);
+				}
+				finally {
+					// close the server now to prevent reconnects
+					IOUtils.closeQuietly(server);
 				}
 			}
-		});
-		t1.start();
-
-		simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
+		};
 
-		try {
-			simpleSink.open(new Configuration());
-
-			//wait socket server to close
-			t1.join();
-			if (error.get() == null) {
-
-				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
-				simpleSink.invoke("testSocketSinkInvoke");
-
-				//socket is closed then test "retry"
-				simpleSink.invoke("testSocketSinkInvoke");
-				simpleSink.close();
-			}
-		} catch (Exception e) {
+		serverRunner.start();
 
-			//check whether throw a exception that reconnect failed.
-			retry = simpleSink.retries;
-			if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
-				error.set(e);
-			}
-		}
+		SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 3);
+		simpleSink.open(new Configuration());
 
+		// wait socket server to close
+		serverRunner.join();
 		if (error.get() != null) {
 			Throwable t = error.get();
 			t.printStackTrace();
-			fail("Error in spawned thread: " + t.getMessage());
+			fail("Error in server thread: " + t.getMessage());
+		}
+
+		try {
+			// socket should be closed, so this should trigger a re-try
+			// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+			simpleSink.invoke(TEST_MESSAGE + '\n');
+			simpleSink.invoke(TEST_MESSAGE + '\n');
 		}
-		assertEquals(10, retry);
+		catch (IOException e) {
+			// check whether throw a exception that reconnect failed.
+			assertTrue("Wrong exception", e.getMessage().contains(EXCEPTION_MESSGAE));
+		}
+		catch (Exception e) {
+			fail("wrong exception: " + e.getClass().getName() + " - " + e.getMessage());
+		}
+
+		assertEquals(3, simpleSink.getCurrentNumberOfRetries());
 	}
 
+	/**
+	 * This test the reconnect to server success.
+	 * First close the server and let the sink get reconnecting.
+	 * Meanwhile, reopen the server to let the sink reconnect success to socket.
+	 */
 	@Test
-	public void testSocketSinkRetryAccess() throws Exception{
+	public void testSocketSinkRetryAccess() throws Exception {
+		final ServerSocket server1 = new ServerSocket(0);
+		final int port = server1.getLocalPort();
 
-		//This test the reconnect to server success.
-		//First close the server and let the sink get reconnecting.
-		//Meanwhile, reopen the server to let the sink reconnect success to socket.
+		final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
 
-		retry = -1L;
-		error.set(null);
-		server = new ServerSocket(0);
-		port = server.getLocalPort();
-
-		t1 = new Thread(new Runnable() {
+		// start a server, for the sink's open() method to connect against
+		// the server is immediately shut down again
+		Thread serverRunner = new Thread("Test server runner") {
 
 			@Override
 			public void run() {
 				try {
-					t1 = Thread.currentThread();
-					Socket sk = server.accept();
+					Socket sk = server1.accept();
 					sk.close();
-					server.close();
-					server = null;
-				} catch (Exception e) {
-					error.set(e);
+				}
+				catch (Throwable t) {
+					error.set(t);
+				}
+				finally {
+					IOUtils.closeQuietly(server1);
 				}
 			}
-		});
-		t1.start();
-
-		try {
-			simpleSink = new SocketClientSink<String>(host, port, simpleSchema, -1);
-			simpleSink.open(new Configuration());
-
-			t1.join();
-			if (error.get() == null) {
+		};
+		serverRunner.start();
 
-				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
-				simpleSink.invoke("testSocketSinkInvoke");
+		final SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, -1, true);
+		simpleSink.open(new Configuration());
 
-				new Thread(new Runnable() {
-
-					@Override
-					public void run() {
-						try {
-							//socket is closed then test "retry"
-							simpleSink.invoke("testSocketSinkInvoke");
-							simpleSink.close();
-						} catch (Exception e) {
-							error.set(e);
-						}
-					}
+		// wait until the server is shut down
+		serverRunner.join();
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in server thread: " + t.getMessage());
+		}
 
-				}).start();
+		// run some data output on the sink. this should fail due to the inactive server, but retry
+		Thread sinkRunner = new Thread("Test sink runner") {
 
-				//set a new server to let the retry success.
-				while (simpleSink.retries == 0) {
-					Thread.sleep(1000);
+			@Override
+			public void run() {
+				try {
+					// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+					simpleSink.invoke(TEST_MESSAGE + '\n');
+					simpleSink.invoke(TEST_MESSAGE + '\n');
+				}
+				catch (Throwable t) {
+					error.set(t);
 				}
-
-				//reopen socket server for sink access
-				value = "";
-				server = new ServerSocket(port);
-				Socket sk = server.accept();
-				BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
-						.getInputStream()));
-				value = rdr.readLine();
-
-				retry = simpleSink.retries;
 			}
-
-		} catch (Exception e) {
-			error.set(e);
-		}
-
+		};
+		sinkRunner.start();
+		
+		// we start another server now, which will make the sink complete its task
+		ServerSocket server2 = new ServerSocket(port);
+		Socket sk = server2.accept();
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk.getInputStream()));
+		String value = rdr.readLine();
+		int retry = simpleSink.getCurrentNumberOfRetries();
+		
+		// let the sink finish
+		sinkRunner.join();
+
+		// make sure that the sink did not throw an error
 		if (error.get() != null) {
 			Throwable t = error.get();
 			t.printStackTrace();
 			fail("Error in spawned thread: " + t.getMessage());
 		}
 
-		assertEquals("testSocketSinkInvoke", value);
+		// validate state and results
+		assertEquals(TEST_MESSAGE, value);
 		assertTrue(retry > 0);
 	}
 }
\ No newline at end of file


[2/2] flink git commit: [FLINK-2536] [streaming] Add a re-connect attempty to socket sink

Posted by se...@apache.org.
[FLINK-2536] [streaming] Add a re-connect attempty to socket sink

This closes #1030


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd354ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd354ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd354ba4

Branch: refs/heads/master
Commit: fd354ba4dbd2a76688992b0a092ccc78b61f1088
Parents: 4abdbbb
Author: HuangWHWHW <40...@qq.com>
Authored: Thu Sep 10 11:23:31 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 17:06:43 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../api/functions/sink/SocketClientSink.java    |  83 +++++-
 .../functions/sink/SocketClientSinkTest.java    | 295 +++++++++++++++++++
 3 files changed, 372 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d7cc1f5..d92498c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1028,7 +1028,7 @@ public class DataStream<T> {
 	 * @return the closed DataStream
 	 */
 	public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
-		DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema));
+		DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
 		returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index adffe5e..39f1ec0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -23,7 +23,10 @@ import java.io.OutputStream;
 import java.net.Socket;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
@@ -31,6 +34,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
  * @param <IN> data to be written into the Socket.
  */
 public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
 	private static final long serialVersionUID = 1L;
 
 	private final String hostName;
@@ -38,6 +43,13 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	private final SerializationSchema<IN, byte[]> schema;
 	private transient Socket client;
 	private transient DataOutputStream dataOutputStream;
+	private long maxRetry;
+	private boolean retryForever;
+	private boolean isRunning;
+	protected long retries;
+	private final SerializableObject lock;
+
+	private static final int CONNECTION_RETRY_SLEEP = 1000;
 
 	/**
 	 * Default constructor.
@@ -46,10 +58,15 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 * @param port Port of the Socket.
 	 * @param schema Schema of the data.
 	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
 		this.hostName = hostName;
 		this.port = port;
 		this.schema = schema;
+		this.maxRetry = maxRetry;
+		this.retryForever = maxRetry < 0;
+		this.isRunning = false;
+		this.retries = 0;
+		this.lock = new SerializableObject();
 	}
 
 	/**
@@ -60,6 +77,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 		try {
 			client = new Socket(hostName, port);
 			outputStream = client.getOutputStream();
+			isRunning = true;
 		} catch (IOException e) {
 			throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
 		}
@@ -73,13 +91,51 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 *			The incoming data
 	 */
 	@Override
-	public void invoke(IN value) {
+	public void invoke(IN value) throws Exception {
 		byte[] msg = schema.serialize(value);
 		try {
 			dataOutputStream.write(msg);
 		} catch (IOException e) {
-			throw new RuntimeException("Cannot send message " + value.toString() +
-					" to socket server at " + hostName + ":" + port, e);
+			LOG.error("Cannot send message " + value +
+					" to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
+					". Trying to reconnect.", e);
+			retries = 0;
+			boolean success = false;
+			while ((retries < maxRetry || retryForever) && !success && isRunning){
+				try {
+
+					if (dataOutputStream != null) {
+						dataOutputStream.close();
+					}
+
+					if (client != null && !client.isClosed()) {
+						client.close();
+					}
+
+					retries++;
+
+					client = new Socket(hostName, port);
+					dataOutputStream = new DataOutputStream(client.getOutputStream());
+					dataOutputStream.write(msg);
+					success = true;
+
+				} catch(IOException ee) {
+					LOG.error("Reconnect to socket server and send message failed. Caused by " +
+							ee.getMessage() + ". Retry time(s):" + retries);
+
+					try {
+						synchronized (lock) {
+							lock.wait(CONNECTION_RETRY_SLEEP);
+						}
+					} catch(InterruptedException eee) {
+						break;
+					}
+				}
+			}
+			if (!success) {
+				throw new RuntimeException("Cannot send message " + value +
+						" to socket server at " + hostName + ":" + port, e);
+			}
 		}
 	}
 
@@ -88,8 +144,21 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 */
 	private void closeConnection(){
 		try {
-			dataOutputStream.close();
-			client.close();
+			isRunning = false;
+
+			if (dataOutputStream != null) {
+				dataOutputStream.close();
+			}
+
+			if (client != null && !client.isClosed()) {
+				client.close();
+			}
+
+			if (lock != null) {
+				synchronized (lock) {
+					lock.notifyAll();
+				}
+			}
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing connection with socket server at "
 					+ hostName + ":" + port, e);
@@ -122,4 +191,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 		closeConnection();
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
new file mode 100644
index 0000000..a9e1b6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+	private final String host = "127.0.0.1";
+	private int port;
+	private long retry;
+	private String value;
+	private Thread t1;
+	private ServerSocket server;
+	private SocketClientSink<String> simpleSink;
+	final private String except = "Cannot send message testSocketSinkInvoke";
+
+	private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
+		@Override
+		public byte[] serialize(String element) {
+			return element.getBytes();
+		}
+	};
+
+	public SocketClientSinkTest() {
+	}
+
+	@Test
+	public void testSocketSink() throws Exception{
+		error.set(null);
+		value = "";
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				t1 = Thread.currentThread();
+
+				try {
+					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+					simpleSink.open(new Configuration());
+					simpleSink.invoke("testSocketSinkInvoke");
+					simpleSink.close();
+				} catch (Exception e){
+					error.set(e);
+				}
+			}
+		}).start();
+
+		Socket sk = server.accept();
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+				.getInputStream()));
+		value = rdr.readLine();
+
+		t1.join();
+		server.close();
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals("testSocketSinkInvoke", value);
+	}
+
+	@Test
+	public void testSocketSinkNoRetry() throws Exception{
+		retry = -1L;
+		error.set(null);
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		t1 = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					t1 = Thread.currentThread();
+					Socket sk = server.accept();
+					sk.close();
+					server.close();
+				} catch (Exception e) {
+					error.set(e);
+				}
+			}
+		});
+		t1.start();
+
+		simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+
+		try {
+			simpleSink.open(new Configuration());
+
+			//wait socket server to close
+			t1.join();
+			if (error.get() == null) {
+
+				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke("testSocketSinkInvoke");
+
+				//socket is closed then test "retry"
+				simpleSink.invoke("testSocketSinkInvoke");
+				simpleSink.close();
+			}
+		} catch (Exception e) {
+
+			//check whether throw a exception that reconnect failed.
+			retry = simpleSink.retries;
+			if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+				error.set(e);
+			}
+		}
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+		assertEquals(0, retry);
+	}
+
+	@Test
+	public void testSocketSinkRetryTenTimes() throws Exception{
+		retry = -1L;
+		error.set(null);
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		t1 = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					t1 = Thread.currentThread();
+					Socket sk = server.accept();
+					sk.close();
+					server.close();
+				} catch (Exception e) {
+					error.set(e);
+				}
+			}
+		});
+		t1.start();
+
+		simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
+
+		try {
+			simpleSink.open(new Configuration());
+
+			//wait socket server to close
+			t1.join();
+			if (error.get() == null) {
+
+				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke("testSocketSinkInvoke");
+
+				//socket is closed then test "retry"
+				simpleSink.invoke("testSocketSinkInvoke");
+				simpleSink.close();
+			}
+		} catch (Exception e) {
+
+			//check whether throw a exception that reconnect failed.
+			retry = simpleSink.retries;
+			if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+				error.set(e);
+			}
+		}
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+		assertEquals(10, retry);
+	}
+
+	@Test
+	public void testSocketSinkRetryAccess() throws Exception{
+
+		//This test the reconnect to server success.
+		//First close the server and let the sink get reconnecting.
+		//Meanwhile, reopen the server to let the sink reconnect success to socket.
+
+		retry = -1L;
+		error.set(null);
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		t1 = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					t1 = Thread.currentThread();
+					Socket sk = server.accept();
+					sk.close();
+					server.close();
+					server = null;
+				} catch (Exception e) {
+					error.set(e);
+				}
+			}
+		});
+		t1.start();
+
+		try {
+			simpleSink = new SocketClientSink<String>(host, port, simpleSchema, -1);
+			simpleSink.open(new Configuration());
+
+			t1.join();
+			if (error.get() == null) {
+
+				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke("testSocketSinkInvoke");
+
+				new Thread(new Runnable() {
+
+					@Override
+					public void run() {
+						try {
+							//socket is closed then test "retry"
+							simpleSink.invoke("testSocketSinkInvoke");
+							simpleSink.close();
+						} catch (Exception e) {
+							error.set(e);
+						}
+					}
+
+				}).start();
+
+				//set a new server to let the retry success.
+				while (simpleSink.retries == 0) {
+					Thread.sleep(1000);
+				}
+
+				//reopen socket server for sink access
+				value = "";
+				server = new ServerSocket(port);
+				Socket sk = server.accept();
+				BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+						.getInputStream()));
+				value = rdr.readLine();
+
+				retry = simpleSink.retries;
+			}
+
+		} catch (Exception e) {
+			error.set(e);
+		}
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals("testSocketSinkInvoke", value);
+		assertTrue(retry > 0);
+	}
+}
\ No newline at end of file