You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 17:23:22 UTC
[2/2] flink git commit: [FLINK-2536] [streaming] Add a re-connect
attempty to socket sink
[FLINK-2536] [streaming] Add a re-connect attempty to socket sink
This closes #1030
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd354ba4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd354ba4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd354ba4
Branch: refs/heads/master
Commit: fd354ba4dbd2a76688992b0a092ccc78b61f1088
Parents: 4abdbbb
Author: HuangWHWHW <40...@qq.com>
Authored: Thu Sep 10 11:23:31 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 17:06:43 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 2 +-
.../api/functions/sink/SocketClientSink.java | 83 +++++-
.../functions/sink/SocketClientSinkTest.java | 295 +++++++++++++++++++
3 files changed, 372 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d7cc1f5..d92498c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1028,7 +1028,7 @@ public class DataStream<T> {
* @return the closed DataStream
*/
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T, byte[]> schema) {
- DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema));
+ DataStreamSink<T> returnStream = addSink(new SocketClientSink<T>(hostName, port, schema, 0));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index adffe5e..39f1ec0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -23,7 +23,10 @@ import java.io.OutputStream;
import java.net.Socket;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
@@ -31,6 +34,8 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema;
* @param <IN> data to be written into the Socket.
*/
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+ protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
private static final long serialVersionUID = 1L;
private final String hostName;
@@ -38,6 +43,13 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
private final SerializationSchema<IN, byte[]> schema;
private transient Socket client;
private transient DataOutputStream dataOutputStream;
+ private long maxRetry;
+ private boolean retryForever;
+ private boolean isRunning;
+ protected long retries;
+ private final SerializableObject lock;
+
+ private static final int CONNECTION_RETRY_SLEEP = 1000;
/**
* Default constructor.
@@ -46,10 +58,15 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
* @param port Port of the Socket.
* @param schema Schema of the data.
*/
- public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+ public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
this.hostName = hostName;
this.port = port;
this.schema = schema;
+ this.maxRetry = maxRetry;
+ this.retryForever = maxRetry < 0;
+ this.isRunning = false;
+ this.retries = 0;
+ this.lock = new SerializableObject();
}
/**
@@ -60,6 +77,7 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
try {
client = new Socket(hostName, port);
outputStream = client.getOutputStream();
+ isRunning = true;
} catch (IOException e) {
throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
}
@@ -73,13 +91,51 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
* The incoming data
*/
@Override
- public void invoke(IN value) {
+ public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
- throw new RuntimeException("Cannot send message " + value.toString() +
- " to socket server at " + hostName + ":" + port, e);
+ LOG.error("Cannot send message " + value +
+ " to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
+ ". Trying to reconnect.", e);
+ retries = 0;
+ boolean success = false;
+ while ((retries < maxRetry || retryForever) && !success && isRunning){
+ try {
+
+ if (dataOutputStream != null) {
+ dataOutputStream.close();
+ }
+
+ if (client != null && !client.isClosed()) {
+ client.close();
+ }
+
+ retries++;
+
+ client = new Socket(hostName, port);
+ dataOutputStream = new DataOutputStream(client.getOutputStream());
+ dataOutputStream.write(msg);
+ success = true;
+
+ } catch(IOException ee) {
+ LOG.error("Reconnect to socket server and send message failed. Caused by " +
+ ee.getMessage() + ". Retry time(s):" + retries);
+
+ try {
+ synchronized (lock) {
+ lock.wait(CONNECTION_RETRY_SLEEP);
+ }
+ } catch(InterruptedException eee) {
+ break;
+ }
+ }
+ }
+ if (!success) {
+ throw new RuntimeException("Cannot send message " + value +
+ " to socket server at " + hostName + ":" + port, e);
+ }
}
}
@@ -88,8 +144,21 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
*/
private void closeConnection(){
try {
- dataOutputStream.close();
- client.close();
+ isRunning = false;
+
+ if (dataOutputStream != null) {
+ dataOutputStream.close();
+ }
+
+ if (client != null && !client.isClosed()) {
+ client.close();
+ }
+
+ if (lock != null) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
} catch (IOException e) {
throw new RuntimeException("Error while closing connection with socket server at "
+ hostName + ":" + port, e);
@@ -122,4 +191,4 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
closeConnection();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fd354ba4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
new file mode 100644
index 0000000..a9e1b6b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
+ */
+public class SocketClientSinkTest{
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ private final String host = "127.0.0.1";
+ private int port;
+ private long retry;
+ private String value;
+ private Thread t1;
+ private ServerSocket server;
+ private SocketClientSink<String> simpleSink;
+ final private String except = "Cannot send message testSocketSinkInvoke";
+
+ private SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes();
+ }
+ };
+
+ public SocketClientSinkTest() {
+ }
+
+ @Test
+ public void testSocketSink() throws Exception{
+ error.set(null);
+ value = "";
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ t1 = Thread.currentThread();
+
+ try {
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+ simpleSink.open(new Configuration());
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ } catch (Exception e){
+ error.set(e);
+ }
+ }
+ }).start();
+
+ Socket sk = server.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+ .getInputStream()));
+ value = rdr.readLine();
+
+ t1.join();
+ server.close();
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals("testSocketSinkInvoke", value);
+ }
+
+ @Test
+ public void testSocketSinkNoRetry() throws Exception{
+ retry = -1L;
+ error.set(null);
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ t1 = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ t1 = Thread.currentThread();
+ Socket sk = server.accept();
+ sk.close();
+ server.close();
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+ });
+ t1.start();
+
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
+
+ try {
+ simpleSink.open(new Configuration());
+
+ //wait socket server to close
+ t1.join();
+ if (error.get() == null) {
+
+ //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke("testSocketSinkInvoke");
+
+ //socket is closed then test "retry"
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ }
+ } catch (Exception e) {
+
+ //check whether throw a exception that reconnect failed.
+ retry = simpleSink.retries;
+ if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+ error.set(e);
+ }
+ }
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+ assertEquals(0, retry);
+ }
+
+ @Test
+ public void testSocketSinkRetryTenTimes() throws Exception{
+ retry = -1L;
+ error.set(null);
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ t1 = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ t1 = Thread.currentThread();
+ Socket sk = server.accept();
+ sk.close();
+ server.close();
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+ });
+ t1.start();
+
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
+
+ try {
+ simpleSink.open(new Configuration());
+
+ //wait socket server to close
+ t1.join();
+ if (error.get() == null) {
+
+ //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke("testSocketSinkInvoke");
+
+ //socket is closed then test "retry"
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ }
+ } catch (Exception e) {
+
+ //check whether throw a exception that reconnect failed.
+ retry = simpleSink.retries;
+ if (!(e instanceof RuntimeException) || e.toString().indexOf(except) == -1){
+ error.set(e);
+ }
+ }
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+ assertEquals(10, retry);
+ }
+
+ @Test
+ public void testSocketSinkRetryAccess() throws Exception{
+
+ //This test the reconnect to server success.
+ //First close the server and let the sink get reconnecting.
+ //Meanwhile, reopen the server to let the sink reconnect success to socket.
+
+ retry = -1L;
+ error.set(null);
+ server = new ServerSocket(0);
+ port = server.getLocalPort();
+
+ t1 = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ t1 = Thread.currentThread();
+ Socket sk = server.accept();
+ sk.close();
+ server.close();
+ server = null;
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+ });
+ t1.start();
+
+ try {
+ simpleSink = new SocketClientSink<String>(host, port, simpleSchema, -1);
+ simpleSink.open(new Configuration());
+
+ t1.join();
+ if (error.get() == null) {
+
+ //firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
+ simpleSink.invoke("testSocketSinkInvoke");
+
+ new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ //socket is closed then test "retry"
+ simpleSink.invoke("testSocketSinkInvoke");
+ simpleSink.close();
+ } catch (Exception e) {
+ error.set(e);
+ }
+ }
+
+ }).start();
+
+ //set a new server to let the retry success.
+ while (simpleSink.retries == 0) {
+ Thread.sleep(1000);
+ }
+
+ //reopen socket server for sink access
+ value = "";
+ server = new ServerSocket(port);
+ Socket sk = server.accept();
+ BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
+ .getInputStream()));
+ value = rdr.readLine();
+
+ retry = simpleSink.retries;
+ }
+
+ } catch (Exception e) {
+ error.set(e);
+ }
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals("testSocketSinkInvoke", value);
+ assertTrue(retry > 0);
+ }
+}
\ No newline at end of file