You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 17:23:22 UTC

[2/2] flink git commit: [FLINK-2536] [streaming] Add a re-connect attempty to socket sink

[FLINK-2536] [streaming] Add a re-connect attempty to socket sink

This closes #1030


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd354ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd354ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd354ba4

Branch: refs/heads/master
Commit: fd354ba4dbd2a76688992b0a092ccc78b61f1088
Parents: 4abdbbb
Author: HuangWHWHW <40...@qq.com>
Authored: Thu Sep 10 11:23:31 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 17:06:43 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../api/functions/sink/SocketClientSink.java    |  83 +++++-
 .../functions/sink/SocketClientSinkTest.java    | 295 +++++++++++++++++++
 3 files changed, 372 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d7cc1f5..d92498c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1028,7 +1028,7 @@ public class DataStream<T> {
 	 * @return the closed DataStream
 	 */
 	public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
-		DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema));
+		DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
 		returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index adffe5e..39f1ec0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -23,7 +23,10 @@ import java.io.OutputStream;
 import java.net.Socket;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
@@ -31,6 +34,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
  * @param <IN> data to be written into the Socket.
  */
 public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
 	private static final long serialVersionUID = 1L;
 
 	private final String hostName;
@@ -38,6 +43,13 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	private final SerializationSchema<IN, byte[]> schema;
 	private transient Socket client;
 	private transient DataOutputStream dataOutputStream;
+	private long maxRetry;
+	private boolean retryForever;
+	private boolean isRunning;
+	protected long retries;
+	private final SerializableObject lock;
+
+	private static final int CONNECTION_RETRY_SLEEP = 1000;
 
 	/**
 	 * Default constructor.
@@ -46,10 +58,15 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 * @param port Port of the Socket.
 	 * @param schema Schema of the data.
 	 */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
 		this.hostName = hostName;
 		this.port = port;
 		this.schema = schema;
+		this.maxRetry = maxRetry;
+		this.retryForever = maxRetry < 0;
+		this.isRunning = false;
+		this.retries = 0;
+		this.lock = new SerializableObject();
 	}
 
 	/**
@@ -60,6 +77,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 		try {
 			client = new Socket(hostName, port);
 			outputStream = client.getOutputStream();
+			isRunning = true;
 		} catch (IOException e) {
 			throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
 		}
@@ -73,13 +91,51 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 *			The incoming data
 	 */
 	@Override
-	public void invoke(IN value) {
+	public void invoke(IN value) throws Exception {
 		byte[] msg = schema.serialize(value);
 		try {
 			dataOutputStream.write(msg);
 		} catch (IOException e) {
-			throw new RuntimeException("Cannot send message " + value.toString() +
-					" to socket server at " + hostName + ":" + port, e);
+			LOG.error("Cannot send message " + value +
+					" to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
+					". Trying to reconnect.", e);
+			retries = 0;
+			boolean success = false;
+			while ((retries < maxRetry || retryForever) && !success && isRunning){
+				try {
+
+					if (dataOutputStream != null) {
+						dataOutputStream.close();
+					}
+
+					if (client != null && !client.isClosed()) {
+						client.close();
+					}
+
+					retries++;
+
+					client = new Socket(hostName, port);
+					dataOutputStream = new DataOutputStream(client.getOutputStream());
+					dataOutputStream.write(msg);
+					success = true;
+
+				} catch(IOException ee) {
+					LOG.error("Reconnect to socket server and send message failed. Caused by " +
+							ee.getMessage() + ". Retry time(s):" + retries);
+
+					try {
+						synchronized (lock) {
+							lock.wait(CONNECTION_RETRY_SLEEP);
+						}
+					} catch(InterruptedException eee) {
+						break;
+					}
+				}
+			}
+			if (!success) {
+				throw new RuntimeException("Cannot send message " + value +
+						" to socket server at " + hostName + ":" + port, e);
+			}
 		}
 	}
 
@@ -88,8 +144,21 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 	 */
 	private void closeConnection(){
 		try {
-			dataOutputStream.close();
-			client.close();
+			isRunning = false;
+
+			if (dataOutputStream != null) {
+				dataOutputStream.close();
+			}
+
+			if (client != null && !client.isClosed()) {
+				client.close();
+			}
+
+			if (lock != null) {
+				synchronized (lock) {
+					lock.notifyAll();
+				}
+			}
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing connection with socket server at "
 					+ hostName + ":" + port, e);
@@ -122,4 +191,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 		closeConnection();
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
new file mode 100644
index 0000000..a9e1b6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+	private final String host = "127.0.0.1";
+	private int port;
+	private long retry;
+	private String value;
+	private Thread t1;
+	private ServerSocket server;
+	private SocketClientSink<String> simpleSink;
+	final private String except = "Cannot send message testSocketSinkInvoke";
+
+	private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
+		@Override
+		public byte[] serialize(String element) {
+			return element.getBytes();
+		}
+	};
+
+	public SocketClientSinkTest() {
+	}
+
+	@Test
+	public void testSocketSink() throws Exception{
+		error.set(null);
+		value = "";
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		new Thread(new Runnable() {
+			@Override
+			public void run() {
+				t1 = Thread.currentThread();
+
+				try {
+					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+					simpleSink.open(new Configuration());
+					simpleSink.invoke("testSocketSinkInvoke");
+					simpleSink.close();
+				} catch (Exception e){
+					error.set(e);
+				}
+			}
+		}).start();
+
+		Socket sk = server.accept();
+		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+				.getInputStream()));
+		value = rdr.readLine();
+
+		t1.join();
+		server.close();
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals("testSocketSinkInvoke", value);
+	}
+
+	@Test
+	public void testSocketSinkNoRetry() throws Exception{
+		retry = -1L;
+		error.set(null);
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		t1 = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					t1 = Thread.currentThread();
+					Socket sk = server.accept();
+					sk.close();
+					server.close();
+				} catch (Exception e) {
+					error.set(e);
+				}
+			}
+		});
+		t1.start();
+
+		simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+
+		try {
+			simpleSink.open(new Configuration());
+
+			//wait socket server to close
+			t1.join();
+			if (error.get() == null) {
+
+				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke("testSocketSinkInvoke");
+
+				//socket is closed then test "retry"
+				simpleSink.invoke("testSocketSinkInvoke");
+				simpleSink.close();
+			}
+		} catch (Exception e) {
+
+			//check whether throw a exception that reconnect failed.
+			retry = simpleSink.retries;
+			if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+				error.set(e);
+			}
+		}
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+		assertEquals(0, retry);
+	}
+
+	@Test
+	public void testSocketSinkRetryTenTimes() throws Exception{
+		retry = -1L;
+		error.set(null);
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		t1 = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					t1 = Thread.currentThread();
+					Socket sk = server.accept();
+					sk.close();
+					server.close();
+				} catch (Exception e) {
+					error.set(e);
+				}
+			}
+		});
+		t1.start();
+
+		simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
+
+		try {
+			simpleSink.open(new Configuration());
+
+			//wait socket server to close
+			t1.join();
+			if (error.get() == null) {
+
+				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke("testSocketSinkInvoke");
+
+				//socket is closed then test "retry"
+				simpleSink.invoke("testSocketSinkInvoke");
+				simpleSink.close();
+			}
+		} catch (Exception e) {
+
+			//check whether throw a exception that reconnect failed.
+			retry = simpleSink.retries;
+			if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+				error.set(e);
+			}
+		}
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+		assertEquals(10, retry);
+	}
+
+	@Test
+	public void testSocketSinkRetryAccess() throws Exception{
+
+		//This test the reconnect to server success.
+		//First close the server and let the sink get reconnecting.
+		//Meanwhile, reopen the server to let the sink reconnect success to socket.
+
+		retry = -1L;
+		error.set(null);
+		server = new ServerSocket(0);
+		port = server.getLocalPort();
+
+		t1 = new Thread(new Runnable() {
+
+			@Override
+			public void run() {
+				try {
+					t1 = Thread.currentThread();
+					Socket sk = server.accept();
+					sk.close();
+					server.close();
+					server = null;
+				} catch (Exception e) {
+					error.set(e);
+				}
+			}
+		});
+		t1.start();
+
+		try {
+			simpleSink = new SocketClientSink<String>(host, port, simpleSchema, -1);
+			simpleSink.open(new Configuration());
+
+			t1.join();
+			if (error.get() == null) {
+
+				//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+				simpleSink.invoke("testSocketSinkInvoke");
+
+				new Thread(new Runnable() {
+
+					@Override
+					public void run() {
+						try {
+							//socket is closed then test "retry"
+							simpleSink.invoke("testSocketSinkInvoke");
+							simpleSink.close();
+						} catch (Exception e) {
+							error.set(e);
+						}
+					}
+
+				}).start();
+
+				//set a new server to let the retry success.
+				while (simpleSink.retries == 0) {
+					Thread.sleep(1000);
+				}
+
+				//reopen socket server for sink access
+				value = "";
+				server = new ServerSocket(port);
+				Socket sk = server.accept();
+				BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+						.getInputStream()));
+				value = rdr.readLine();
+
+				retry = simpleSink.retries;
+			}
+
+		} catch (Exception e) {
+			error.set(e);
+		}
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals("testSocketSinkInvoke", value);
+		assertTrue(retry > 0);
+	}
+}
\ No newline at end of file