You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/08 18:01:29 UTC

flink git commit: [FLINK-2490][streaming] fix retryForever check in SocketStreamFunction

Repository: flink
Updated Branches:
  refs/heads/master e96e5c0b5 -> 18004343b


[FLINK-2490][streaming] fix retryForever check in SocketStreamFunction

This closes #992.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18004343
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18004343
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18004343

Branch: refs/heads/master
Commit: 18004343ba28f83606432a70871c6264bb925544
Parents: e96e5c0
Author: HuangWHWHW <40...@qq.com>
Authored: Sun Sep 6 08:56:26 2015 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 8 18:01:02 2015 +0200

----------------------------------------------------------------------
 .../source/SocketTextStreamFunction.java        |  15 +-
 .../api/functions/SocketClientSinkTest.java     | 128 -------------
 .../source/SocketTextStreamFunctionTest.java    | 189 +++++++++++++++++++
 3 files changed, 197 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18004343/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index a55a56d..6e7bcf6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -42,7 +42,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	private boolean retryForever;
 	private Socket socket;
 	private static final int CONNECTION_TIMEOUT_TIME = 0;
-	private static final int CONNECTION_RETRY_SLEEP = 1000;
+	static int CONNECTION_RETRY_SLEEP = 1000;
+	protected long retries;
 
 	private volatile boolean isRunning;
 
@@ -67,9 +68,9 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 		streamFromSocket(ctx, socket);
 	}
 
-	public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
+	private void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
 		try {
-			StringBuffer buffer = new StringBuffer();
+			StringBuilder buffer = new StringBuilder();
 			BufferedReader reader = new BufferedReader(new InputStreamReader(
 					socket.getInputStream()));
 
@@ -87,11 +88,11 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 
 				if (data == -1) {
 					socket.close();
-					long retry = 0;
 					boolean success = false;
-					while (retry < maxRetry && !success) {
+					retries = 0;
+					while ((retries < maxRetry  || retryForever) && !success) {
 						if (!retryForever) {
-							retry++;
+							retries++;
 						}
 						LOG.warn("Lost connection to server socket. Retrying in "
 								+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
@@ -118,7 +119,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 
 				if (data == delimiter) {
 					ctx.collect(buffer.toString());
-					buffer = new StringBuffer();
+					buffer = new StringBuilder();
 				} else if (data != '\r') { // ignore carriage return
 					buffer.append((char) data);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/18004343/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java
deleted file mode 100644
index b6b6181..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
- */
-public class SocketClientSinkTest{
-
-	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-	private final String host = "127.0.0.1";
-	private int port;
-	private String access;
-	private String value;
-	public SocketServer.ServerThread th;
-
-	public SocketClientSinkTest() {
-	}
-
-	class SocketServer extends Thread {
-
-		private ServerSocket server;
-		private Socket sk;
-		private BufferedReader rdr;
-
-		private SocketServer() {
-			try {
-				this.server = new ServerSocket(0);
-				port = server.getLocalPort();
-			} catch (Exception e) {
-				error.set(e);
-			}
-		}
-
-		public void run() {
-			try {
-				sk = server.accept();
-				access = "Connected";
-				th = new ServerThread(sk);
-				th.start();
-			} catch (Exception e) {
-				error.set(e);
-			}
-		}
-
-		class ServerThread extends Thread {
-			Socket sk;
-
-			public ServerThread(Socket sk) {
-				this.sk = sk;
-			}
-
-			public void run() {
-				try {
-					rdr = new BufferedReader(new InputStreamReader(sk
-							.getInputStream()));
-					value = rdr.readLine();
-				} catch (IOException e) {
-					error.set(e);
-				}
-			}
-		}
-	}
-
-	@Test
-	public void testSocketSink() throws Exception{
-
-		SocketServer server = new SocketServer();
-		server.start();
-
-		SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
-			@Override
-			public byte[] serialize(String element) {
-				return element.getBytes();
-			}
-		};
-
-		SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema);
-		simpleSink.open(new Configuration());
-		simpleSink.invoke("testSocketSinkInvoke");
-		simpleSink.close();
-
-		server.join();
-		th.join();
-
-		if (error.get() != null) {
-			Throwable t = error.get();
-			t.printStackTrace();
-			fail("Error in spawned thread: " + t.getMessage());
-		}
-
-		assertEquals("Connected", this.access);
-		assertEquals("testSocketSinkInvoke", value);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/18004343/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
new file mode 100644
index 0000000..5f16c00
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.DataOutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.verify;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+	private final String host = "127.0.0.1";
+	private final SourceFunction.SourceContext<String> ctx = Mockito.mock(SourceFunction.SourceContext.class);
+
+	public SocketTextStreamFunctionTest() {
+	}
+
+	class SocketSource extends Thread {
+
+		SocketTextStreamFunction socketSource;
+
+		public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception {
+			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
+		}
+
+		public void run() {
+			try {
+				this.socketSource.open(new Configuration());
+				this.socketSource.run(ctx);
+			}catch(Exception e){
+				error.set(e);
+			}
+		}
+
+		public void cancel(){
+			this.socketSource.cancel();
+		}
+	}
+
+	@Test
+	public void testSocketSourceRetryForever() throws Exception{
+		error.set(null);
+		ServerSocket serverSo = new ServerSocket(0);
+		SocketSource source = new SocketSource(serverSo, -1);
+		source.start();
+
+		int count = 0;
+		Socket channel;
+		while (count < 100) {
+			channel = serverSo.accept();
+			count++;
+			channel.close();
+			assertEquals(0, source.socketSource.retries);
+		}
+		source.cancel();
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals(100, count);
+	}
+
+	@Test
+	public void testSocketSourceRetryTenTimes() throws Exception{
+		error.set(null);
+		ServerSocket serverSo = new ServerSocket(0);
+		SocketSource source = new SocketSource(serverSo, 10);
+		source.socketSource.CONNECTION_RETRY_SLEEP = 200;
+
+		assertEquals(0, source.socketSource.retries);
+
+		source.start();
+
+		Socket channel;
+		channel = serverSo.accept();
+		channel.close();
+		serverSo.close();
+		while(source.socketSource.retries < 10){
+			long lastRetry = source.socketSource.retries;
+			sleep(100);
+			assertTrue(source.socketSource.retries >= lastRetry);
+		};
+		assertEquals(10, source.socketSource.retries);
+		source.cancel();
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals(10, source.socketSource.retries);
+	}
+
+	@Test
+	public void testSocketSourceNeverRetry() throws Exception{
+		error.set(null);
+		ServerSocket serverSo = new ServerSocket(0);
+		SocketSource source = new SocketSource(serverSo, 0);
+		source.start();
+
+		Socket channel;
+		channel = serverSo.accept();
+		channel.close();
+		serverSo.close();
+		sleep(2000);
+		source.cancel();
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals(0, source.socketSource.retries);
+	}
+
+	@Test
+	public void testSocketSourceRetryTenTimesWithFirstPass() throws Exception{
+		ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+
+		error.set(null);
+		ServerSocket serverSo = new ServerSocket(0);
+		SocketSource source = new SocketSource(serverSo, 10);
+		source.socketSource.CONNECTION_RETRY_SLEEP = 200;
+
+		assertEquals(0, source.socketSource.retries);
+
+		source.start();
+
+		Socket channel;
+		channel = serverSo.accept();
+		DataOutputStream dataOutputStream = new DataOutputStream(channel.getOutputStream());
+		dataOutputStream.write("testFirstSocketpass\n".getBytes());
+		channel.close();
+		serverSo.close();
+		while(source.socketSource.retries < 10){
+			long lastRetry = source.socketSource.retries;
+			sleep(100);
+			assertTrue(source.socketSource.retries >= lastRetry);
+		};
+		assertEquals(10, source.socketSource.retries);
+		source.cancel();
+
+		verify(ctx).collect(argument.capture());
+
+		if (error.get() != null) {
+			Throwable t = error.get();
+			t.printStackTrace();
+			fail("Error in spawned thread: " + t.getMessage());
+		}
+
+		assertEquals("testFirstSocketpass", argument.getValue());
+		assertEquals(10, source.socketSource.retries);
+	}
+}
\ No newline at end of file