You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/09/08 18:01:29 UTC
flink git commit: [FLINK-2490][streaming] fix retryForever check in
SocketStreamFunction
Repository: flink
Updated Branches:
refs/heads/master e96e5c0b5 -> 18004343b
[FLINK-2490][streaming] fix retryForever check in SocketStreamFunction
This closes #992.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18004343
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18004343
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18004343
Branch: refs/heads/master
Commit: 18004343ba28f83606432a70871c6264bb925544
Parents: e96e5c0
Author: HuangWHWHW <40...@qq.com>
Authored: Sun Sep 6 08:56:26 2015 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Sep 8 18:01:02 2015 +0200
----------------------------------------------------------------------
.../source/SocketTextStreamFunction.java | 15 +-
.../api/functions/SocketClientSinkTest.java | 128 -------------
.../source/SocketTextStreamFunctionTest.java | 189 +++++++++++++++++++
3 files changed, 197 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/18004343/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index a55a56d..6e7bcf6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -42,7 +42,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private boolean retryForever;
private Socket socket;
private static final int CONNECTION_TIMEOUT_TIME = 0;
- private static final int CONNECTION_RETRY_SLEEP = 1000;
+ static int CONNECTION_RETRY_SLEEP = 1000;
+ protected long retries;
private volatile boolean isRunning;
@@ -67,9 +68,9 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
streamFromSocket(ctx, socket);
}
- public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
+ private void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
try {
- StringBuffer buffer = new StringBuffer();
+ StringBuilder buffer = new StringBuilder();
BufferedReader reader = new BufferedReader(new InputStreamReader(
socket.getInputStream()));
@@ -87,11 +88,11 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
if (data == -1) {
socket.close();
- long retry = 0;
boolean success = false;
- while (retry < maxRetry && !success) {
+ retries = 0;
+ while ((retries < maxRetry || retryForever) && !success) {
if (!retryForever) {
- retry++;
+ retries++;
}
LOG.warn("Lost connection to server socket. Retrying in "
+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
@@ -118,7 +119,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
if (data == delimiter) {
ctx.collect(buffer.toString());
- buffer = new StringBuffer();
+ buffer = new StringBuilder();
} else if (data != '\r') { // ignore carriage return
buffer.append((char) data);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/18004343/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java
deleted file mode 100644
index b6b6181..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
- */
-public class SocketClientSinkTest{
-
- final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
- private final String host = "127.0.0.1";
- private int port;
- private String access;
- private String value;
- public SocketServer.ServerThread th;
-
- public SocketClientSinkTest() {
- }
-
- class SocketServer extends Thread {
-
- private ServerSocket server;
- private Socket sk;
- private BufferedReader rdr;
-
- private SocketServer() {
- try {
- this.server = new ServerSocket(0);
- port = server.getLocalPort();
- } catch (Exception e) {
- error.set(e);
- }
- }
-
- public void run() {
- try {
- sk = server.accept();
- access = "Connected";
- th = new ServerThread(sk);
- th.start();
- } catch (Exception e) {
- error.set(e);
- }
- }
-
- class ServerThread extends Thread {
- Socket sk;
-
- public ServerThread(Socket sk) {
- this.sk = sk;
- }
-
- public void run() {
- try {
- rdr = new BufferedReader(new InputStreamReader(sk
- .getInputStream()));
- value = rdr.readLine();
- } catch (IOException e) {
- error.set(e);
- }
- }
- }
- }
-
- @Test
- public void testSocketSink() throws Exception{
-
- SocketServer server = new SocketServer();
- server.start();
-
- SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
- @Override
- public byte[] serialize(String element) {
- return element.getBytes();
- }
- };
-
- SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema);
- simpleSink.open(new Configuration());
- simpleSink.invoke("testSocketSinkInvoke");
- simpleSink.close();
-
- server.join();
- th.join();
-
- if (error.get() != null) {
- Throwable t = error.get();
- t.printStackTrace();
- fail("Error in spawned thread: " + t.getMessage());
- }
-
- assertEquals("Connected", this.access);
- assertEquals("testSocketSinkInvoke", value);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/18004343/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
new file mode 100644
index 0000000..5f16c00
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.DataOutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static java.lang.Thread.sleep;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.verify;
+
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
+ */
+public class SocketTextStreamFunctionTest{
+
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ private final String host = "127.0.0.1";
+ private final SourceFunction.SourceContext<String> ctx = Mockito.mock(SourceFunction.SourceContext.class);
+
+ public SocketTextStreamFunctionTest() {
+ }
+
+ class SocketSource extends Thread {
+
+ SocketTextStreamFunction socketSource;
+
+ public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception {
+ this.socketSource = new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
+ }
+
+ public void run() {
+ try {
+ this.socketSource.open(new Configuration());
+ this.socketSource.run(ctx);
+ }catch(Exception e){
+ error.set(e);
+ }
+ }
+
+ public void cancel(){
+ this.socketSource.cancel();
+ }
+ }
+
+ @Test
+ public void testSocketSourceRetryForever() throws Exception{
+ error.set(null);
+ ServerSocket serverSo = new ServerSocket(0);
+ SocketSource source = new SocketSource(serverSo, -1);
+ source.start();
+
+ int count = 0;
+ Socket channel;
+ while (count < 100) {
+ channel = serverSo.accept();
+ count++;
+ channel.close();
+ assertEquals(0, source.socketSource.retries);
+ }
+ source.cancel();
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals(100, count);
+ }
+
+ @Test
+ public void testSocketSourceRetryTenTimes() throws Exception{
+ error.set(null);
+ ServerSocket serverSo = new ServerSocket(0);
+ SocketSource source = new SocketSource(serverSo, 10);
+ source.socketSource.CONNECTION_RETRY_SLEEP = 200;
+
+ assertEquals(0, source.socketSource.retries);
+
+ source.start();
+
+ Socket channel;
+ channel = serverSo.accept();
+ channel.close();
+ serverSo.close();
+ while(source.socketSource.retries < 10){
+ long lastRetry = source.socketSource.retries;
+ sleep(100);
+ assertTrue(source.socketSource.retries >= lastRetry);
+ };
+ assertEquals(10, source.socketSource.retries);
+ source.cancel();
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals(10, source.socketSource.retries);
+ }
+
+ @Test
+ public void testSocketSourceNeverRetry() throws Exception{
+ error.set(null);
+ ServerSocket serverSo = new ServerSocket(0);
+ SocketSource source = new SocketSource(serverSo, 0);
+ source.start();
+
+ Socket channel;
+ channel = serverSo.accept();
+ channel.close();
+ serverSo.close();
+ sleep(2000);
+ source.cancel();
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals(0, source.socketSource.retries);
+ }
+
+ @Test
+ public void testSocketSourceRetryTenTimesWithFirstPass() throws Exception{
+ ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+
+ error.set(null);
+ ServerSocket serverSo = new ServerSocket(0);
+ SocketSource source = new SocketSource(serverSo, 10);
+ source.socketSource.CONNECTION_RETRY_SLEEP = 200;
+
+ assertEquals(0, source.socketSource.retries);
+
+ source.start();
+
+ Socket channel;
+ channel = serverSo.accept();
+ DataOutputStream dataOutputStream = new DataOutputStream(channel.getOutputStream());
+ dataOutputStream.write("testFirstSocketpass\n".getBytes());
+ channel.close();
+ serverSo.close();
+ while(source.socketSource.retries < 10){
+ long lastRetry = source.socketSource.retries;
+ sleep(100);
+ assertTrue(source.socketSource.retries >= lastRetry);
+ };
+ assertEquals(10, source.socketSource.retries);
+ source.cancel();
+
+ verify(ctx).collect(argument.capture());
+
+ if (error.get() != null) {
+ Throwable t = error.get();
+ t.printStackTrace();
+ fail("Error in spawned thread: " + t.getMessage());
+ }
+
+ assertEquals("testFirstSocketpass", argument.getValue());
+ assertEquals(10, source.socketSource.retries);
+ }
+}
\ No newline at end of file