You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by HuangWHWHW <gi...@git.apache.org> on 2015/08/06 11:07:09 UTC

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

GitHub user HuangWHWHW opened a pull request:

    https://github.com/apache/flink/pull/992

    [FLINK-2490][FIX]Remove the retryForever check in function streamFrom…

    In the class SocketTextStreamFunction, the var retryForever only  be set in  the line "this.retryForever = maxRetry < 0;"(SocketTextStreamFunction.java:54).
    When the program executes this “while (retry < maxRetry && !success) ” it means the maxRetry > 0 and the retryForever will always be false.
    So it`s unnecessary to judge whether retryForever be false.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HuangWHWHW/flink FLINK-2490

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #992
    
----
commit 25a41d0f462add2f11c6830ec8db518702f8dbb5
Author: HuangWHWHW <40...@qq.com>
Date:   2015-08-06T08:55:24Z

    [FLINK-2490][FIX]Remove the retryForever check in function streamFromSocket

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37844142
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    +			assertTrue(source.socketSource.retries > lastRetry);
    --- End diff --
    
    Yes, exactly. Please see my previous edit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-129875331
  
    @mxm 
    Ok, I`ll add a test.
    There is a little difficult that I can`t get the retry times in test since the retry is a local variable.
    So can I add a function to get the retry times?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130663511
  
    Ok, I add two cases(retry 10 and 0) since I thought retry 1 time just same as 10.
    And would you please take a look with another two tests(https://github.com/apache/flink/pull/991   and  https://github.com/apache/flink/pull/977)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37748844
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    --- End diff --
    
    Did you have to add the sleep here to make the test pass? 10 seconds is quite long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-135431535
  
    @mxm 
    BTW:Could you help me to take a look of this CI:https://github.com/apache/flink/pull/1030?
    Since I still can not watch the CI details currently.
    Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37749598
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    --- End diff --
    
    You might want to decrease the sleep time and do an `assertTrue(source.socketSource.retries >= lastRetry)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130644950
  
    `StringBuilder` is only for single-threaded while `StringBuffer` enables multi-thread access. If you use `StringBuffer` in a single-threaded scenario it has worse performance than `StringBuilder`.
    
    Thanks for you changes. In addition to the "infinity" test, can you add a test that checks for a certain number of retries (e.g. 10)? Also please add a check for 1 and 0 retries. It's always good to test corner cases :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37856724
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    I think you can decrease the waiting time even further to 100 ms. Otherwise, the test case will take 10 seconds which is quite long for one test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37937505
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -42,11 +42,13 @@
     	private boolean retryForever;
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
    -	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	public static int CONNECTION_RETRY_SLEEP = 1000;
    --- End diff --
    
    But if I add the final, it will be a error in my test:
    Cannot assign a value to final variable "CONNECTION_RETRY_SLEEP".



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37042816
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -43,6 +43,7 @@
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
     	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	protected long retrys;
    --- End diff --
    
    ....
    Sorry for my English. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36973377
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -43,6 +43,7 @@
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
     	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	protected long retrys;
    --- End diff --
    
    Sorry, minor thing but it should be `retries`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-132982117
  
    Hi Max,
    I am very sorry to bothered you.
    I fixed some of my PRs and was waiting for your reply for days.
    Otherwise, as your advice, I added a change about the sink retry and take tests for it.
    I would be very honored if you can spend a little time to take a look about these PRs.
    Thank you!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-128309202
  
    If you think it was necessary why was your first step to remove it's usage...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r38904579
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.io.DataOutputStream;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    --- End diff --
    
    Could you replace this with a simple Mock using Mockito? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130626843
  
    Hi Max,
    I fixed all as your reviews.
    And I retained the change of StringBuffer to StringBuilder.
    There is a question that as I see the StringBuilder just do the same thing as StringBuffer currently.
    So what`s the real different the two type in the SocketTextStreamFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37959366
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +
    +			Field field = SocketTextStreamFunction.class.getDeclaredField("CONNECTION_RETRY_SLEEP");
    +			field.setAccessible(true);
    +			Field modifiersField = Field.class.getDeclaredField("modifiers");
    +			modifiersField.setAccessible(true);
    +			modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    +			field.set(null, 200);
    --- End diff --
    
    That's quite a hack. I think it is ok to remove the `final` modifier and make the field variable package-local: `static int CONNECTION_RETRY_SLEEP = 1000`. Then you can set it directly here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36955189
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -40,10 +37,12 @@
     	private char delimiter;
     	private long maxRetry;
     	private boolean retryForever;
    +	private boolean isRetrying = false;
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
     	private static final int CONNECTION_RETRY_SLEEP = 1000;
     
    +	private volatile boolean isExit = false;
    --- End diff --
    
    Is this flag necessary? We have `isRunning` already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-135351424
  
    While merging your pull request I noticed that the `SocketTextStreamFunction` actually does not wait the time specified in `CONNCTION_RETRY_SLEEP` but immediately tries to reconnect in case of an EOF. It only waits in case of a `ConnectionError`. I'm not sure whether this behavior is desired but this should also be reflected in your test cases. Could you add a test case where you first pass a Socket with an EOF and then let an `ConnectionError` occur?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-133238272
  
    @tillrohrmann 
    Thank you for the info :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-137372443
  
    I think there is an issue with Travis at the moment. Could you force push to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37959211
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -42,11 +42,13 @@
     	private boolean retryForever;
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
    -	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	public static int CONNECTION_RETRY_SLEEP = 1000;
    --- End diff --
    
    Ok no problem. Then make the variable non-final but don't expose it. So just have it `static int CONNECTION_RETRY_SLEEP = 1000`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-128307751
  
    Yes, I understand you.
    But I think the retryForever is necessary.
    Maybe there is a bug that make the retryForever not working.
    I`ll get another fix after the CI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r38414236
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -42,11 +42,13 @@
     	private boolean retryForever;
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
    -	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	static int CONNECTION_RETRY_SLEEP = 1000;
    +	protected long retries;
     
     	private volatile boolean isRunning;
     
     	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
    +		this.retries = 0;
    --- End diff --
    
    Hi,
    I removed this in my new commit.
    Otherwise why doesn`t the CI run?
    This happen in all of my new commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134567337
  
    Looks good to merge if we further adjust the waiting time of the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134526309
  
    Hi, I take a new change that decrease the sleep time and clear the error for each test cases.
    But I have no idea to control the value of retry to increase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130228505
  
    Hi, there are two more questions:
    1.In using StringBuilder, does it mean that we should use BufferedReader.readLine() instead of BufferedReader.read()?
    2.Could you tell me how to make the BufferedReader.read() return -1? I tried many ways that all filed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-135262313
  
    Hi,@mxm
    any new comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130672880
  
    > Otherwise, I found the SocketClientSink didn`t have the "retry".
    Is it necessary to get a "retry"?
    
    Yes, that might be an issue but let's keep it separate from our concern here. If you want, you can open a JIRA issue for the missing retry option in the `SocketClientSink`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37860526
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    You mean that change the "private static final int CONNECTION_RETRY_SLEEP = 1000;“ in my test?
    But how to do it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-128301542
  
    if you remove that check, retryForever is unused and can be removed completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134570319
  
    Hi, I decreased both the waiting time and the retry times since it will still cost over 10 seconds if only the waiting time is decreased due to the "Thread.sleep(CONNECTION_RETRY_SLEEP);".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37859069
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    You might want to remove the private modifier from `CONNECTION_RETRY_SLEEP` and change it in the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r38909392
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.io.DataOutputStream;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    --- End diff --
    
    You're right, you need to capture the input to the collect method. How about using an `ArgumentCaptor`? http://mockito.googlecode.com/svn/tags/1.8.0/javadoc/org/mockito/ArgumentCaptor.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-128322151
  
    Hah....
    Sorry, this thought was generated after this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-138608134
  
    Nice :) Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37749287
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    +			assertTrue(source.socketSource.retries > lastRetry);
    --- End diff --
    
    Could you change this to `assertEquals(lastRetry+1, source.socketSource.retries)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-129667322
  
    @StephanEwen 
    Hi,yes,I plan to add a test for it.
    However, the test may be failed since the retryForever in the flink-master is also unworked currently.
    Will the test I`d add run together with this PR?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36955221
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -69,14 +68,14 @@ public void run(SourceContext<String> ctx) throws Exception {
     
     	public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
    --- End diff --
    
    I think this method should be private because it is not meant to be used outside this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36955259
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -69,14 +68,14 @@ public void run(SourceContext<String> ctx) throws Exception {
     
     	public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Exception {
     		try {
    -			StringBuffer buffer = new StringBuffer();
    +			StringBuilder buffer = new StringBuilder();
     			BufferedReader reader = new BufferedReader(new InputStreamReader(
     					socket.getInputStream()));
     
     			while (isRunning) {
    -				int data;
    +				String data;
     				try {
    -					data = reader.read();
    +					data = reader.readLine();
    --- End diff --
    
    Please use `read()` because of the custom delimiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36955316
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -85,11 +84,12 @@ public void streamFromSocket(SourceContext<String> ctx, Socket socket) throws Ex
     					}
     				}
     
    -				if (data == -1) {
    +				if (data == null) {
     					socket.close();
     					long retry = 0;
     					boolean success = false;
    -					while (retry < maxRetry && !success) {
    +					while ((retry < maxRetry || (retryForever && !isExit)) && !success) {
    +						isRetrying = true;
    --- End diff --
    
    This flag is only necessary for your test and thus should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36955334
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -145,4 +152,8 @@ public void cancel() {
     			}
     		}
     	}
    +
    +	public boolean getIsRetrying() {
    --- End diff --
    
    Please remove this getter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134493928
  
    @mxm 
    It doesn`t matter.
    I`ll take a new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130311909
  
    Thank you!
    I`ll try again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-135419765
  
    @mxm 
    Hi, 
    my test just do the same thing that first pass the Socket
    when it calls SocketTextStreamFunction.open.
    Then close the Socket that means send nothing(EOF) to the SocketTextStreamFunction.
    And after this it will retry.
    And first time the retry happens immediately since SocketTextStreamFunction receives the end of the sent message.
    Then it will get retry second, third... times since I do not open the socket which means a ConnectionError occur.
    So can you describe more in detail?
    Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134797100
  
    @mxm 
    Hi,I chage the CONNECTION_RETRY_SLEEP to static final int CONNECTION_RETRY_SLEEP = 1000;
    But I have no idea to straightly changing the CONNECTION_RETRY_SLEEP in my test using:
    SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200.
    So, I add a reflection mechanism to resolve this.
    And now the CONNECTION_RETRY_SLEEP changes to 200 in my test.
    Would you please to take a look whether it is correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37860800
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    Or take a forever change:
    Thread.sleep(CONNECTION_RETRY_SLEEP); to 
    synchronized (lock){
    lock.wait(CONNECTION_RETRY_SLEEP);
    }
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130999638
  
    Hi , I did a new change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130206658
  
    @mxm 
    @StephanEwen 
    Hi, I do a test for this today and I got another problem.
    The SocketTextStreamFunction use BufferedReader.read() to get the buffer which is sent by socket server.
    And whether this function BufferedReader.read() will never return -1 as the end of the sent message?
    If it was there should be another bug that code following will never be reachable:
    
    if (data == -1) {
    					socket.close();
    					long retry = 0;
    					boolean success = false;
    					while ((retry < maxRetry || retryForever) && !success) {
    						if (!retryForever) {
    							retry++;
    						}
    						LOG.warn("Lost connection to server socket. Retrying in "
    								+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
    						try {
    							socket = new Socket();
    							socket.connect(new InetSocketAddress(hostname, port),
    									CONNECTION_TIMEOUT_TIME);
    							success = true;
    						} catch (ConnectException ce) {
    							Thread.sleep(CONNECTION_RETRY_SLEEP);
    							socket.close();
    						}
    					}
    
    					if (success) {
    						LOG.info("Server socket is reconnected.");
    					} else {
    						LOG.error("Could not reconnect to server socket.");
    						break;
    					}
    					reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    					continue;
    				}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36973279
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retrys < 10);
    +		sleep(10000);
    --- End diff --
    
    You could check again if the retry count increased in the meantime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130237642
  
    > 1.In using StringBuilder, does it mean that we should use BufferedReader.readLine() instead of BufferedReader.read()?
    
    Reading by character is the way to go if we use a custom `delimiter`. If our delimiter was `\n` then it would be ok to read entire lines.
    
    >  Could you tell me how to make the BufferedReader.read() return -1? I tried many ways that all filed.
    
    Ok :) Here is a minimal working example where `read()` returns `-1`:
    
    ```java
    public static void main(String[] args) throws IOException {
    
    	ServerSocket socket = new ServerSocket(12345);
    
    	final SocketAddress socketAddress = socket.getLocalSocketAddress();
    
    	new Thread(new Runnable() {
    		@Override
    		public void run() {
    			Socket socket = new Socket();
    
    			try {
    				socket.connect(socketAddress);
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    
    			try {
    				BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    				System.out.println((bufferedReader.read()));
    			} catch (IOException e) {
    				e.printStackTrace();
    			}
    		}
    	}).start();
    
    	Socket channel = socket.accept();
    
    	channel.close();
    }
    ```
    Output:
    ```
    -1
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134199343
  
    @HuangWHWHW Sorry for keeping you waiting. I've made some more comments. Otherwise, I think this looks ready to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-138518846
  
    @mxm
    Hi, I added the ArgumentCaptor to the test and removed the unwanted code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37762121
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    --- End diff --
    
    You should clear the error for subsequent test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37748868
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    --- End diff --
    
    Did you have to add the sleep here to make the test pass? 10 seconds is quite long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37858063
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    So, may be I can decrease the retry times to 2?
    The 10 seconds is because of the "Thread.sleep(CONNECTION_RETRY_SLEEP);" in SocketTextStreamFunction.java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r38906129
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.io.DataOutputStream;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    --- End diff --
    
    The ctx is used to get the received msg from socket server.
    So I override the toString() method in ctx.
    I think it will be invalid if we use mockito to check the received msg is correct.
    Or is it unnecessary to check the msg since this test is for the retry times?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r38409485
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -42,11 +42,13 @@
     	private boolean retryForever;
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
    -	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	static int CONNECTION_RETRY_SLEEP = 1000;
    +	protected long retries;
     
     	private volatile boolean isRunning;
     
     	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
    +		this.retries = 0;
    --- End diff --
    
    Initialization to `0` is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130664351
  
    @mxm 
    Otherwise, I found the SocketClientSink didn`t have the "retry".
    Is it necessary to get a "retry"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130217420
  
    @mxm 
    Hi, thank you for suggestions.
    I will try to follow your suggestions and improve the test.
    I understand almost of yours and I also read the Class documentation of BufferedReader.read().
    When I was doing the test I found the BufferedReader.read() would never stop until it read next char from socket server or throw a Exception when socket is closed.
    Returning -1 in BufferedReader.read() seems to be only worked in text file instead socket message.
    And I looked for help in the net that some guys said you might add a method(Socket.setSoTimeout()) so the BufferedReader.read() would stop.
    But this way is not satisfied neither since it would throw a exception.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37845490
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    +			assertTrue(source.socketSource.retries > lastRetry);
    --- End diff --
    
    Ok, add a "assert" in testSocketSourceRetryForever.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-138268476
  
    @mxm 
    @StephanEwen 
    Hi, very sorry for bothering.
    I got the CI passed.
    Is there any new comment or this can be merged?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-134902558
  
    @HuangWHWHW Thanks for your changes. Adding reflection calls to the testing codes is not good practice and makes the code hard to maintain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-135429969
  
    @mxm 
    Hi,
    I add a test that first send a message to the SocketTextStreamFunction and this is success.
    Then I close the server let the SocketTextStreamFunction retry.
    Is it you need?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-136739055
  
    I am not sure why the CI is not retesting this.
    
    Can you try to squash your commits into one commit and force-push this branch? This always triggers CI for me...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37860912
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    Remove the private modifier in `SocketTextStreamFunction`: `static final int CONNECTION_RETRY_SLEEP = 1000;`. That way, the variable can be changed by other classes within the same Java package. In your test, set the sleep interval using e.g. `SocketTextStreamFunction.CONNECTION_RETRY_SLEEP = 200`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37879401
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -42,11 +42,13 @@
     	private boolean retryForever;
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
    -	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	public static int CONNECTION_RETRY_SLEEP = 1000;
    --- End diff --
    
    This shouldn't be modifiable by everyone. Please make it just package-visible by removing the `public` modifier. Also, please keep the `final` modifier because the current implementation just lets the number of retries be configurable with a fixed 1 second retry rate. This is also documented in the user-facing API methods on DataStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-129864531
  
    @HuangWHWHW `retryForever` is just a convenience variable for `maxRetry < 0`. Your fix is correct because the loop will only execute if `maxRetry > 0` and thus not execute at all if it should retry "forever". It would be great if you added a test that checks for the correct number of retries. In case of infinite retries, just check up to a certain number (e.g. 100 retries).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130213092
  
    @HuangWHWHW `read()` method of the `BufferedReader` object returns `-1` in case the end of the stream has been reached.
    
    A couple of things I noticed apart from the `retryForever` issue. I wonder if we can fix these with this pull request as well:
    
    1. The control flow of the `streamFromSocket` function is hard to predict because there are many `while` loops with `break`, `continue`, or `throw` statements.
    2. We could use `StringBuilder` instead of `StringBuffer` in this class. `StringBuilder` is faster in the case of single-threaded access.
    3. The function reads a single character at a time from the socket. It is more efficient to use a buffer and read several characters at once.
    
    @HuangWHWHW You asked how you could count the number of retries in a unit test. Typically, you would insert a `Mock` or a `Spy` into your test method. Unfortunately, this does not work here because the socket variables is overwritten in case of a retry. So for this test, I would recommend creating a local `ServerSocket` and let the function connect to this socket. You can then control the failures from your test socket. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37843364
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    +			assertTrue(source.socketSource.retries > lastRetry);
    --- End diff --
    
    Changing to assertEquals(lastRetry+1, source.socketSource.retries) may be some difficult.
    It`s hard to control the real value of source.socketSource.retries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130222078
  
    Actually point 3 is not so bad because we're using a buffered reader that fills the buffer and does not read a character from the socket on every call to `read()`.
    
    The `read()` method may throw an Exception or return -1. So we need to handle both of these cases. If closed properly, the socket should send the EOF event and the `read()` method returns -1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/992


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37861790
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    Ah, I see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130524804
  
    @mxm 
    Hi, I fixed the StringBuffer and add the test.
    Take a look whether it`s correct.
    Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37963338
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,189 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) throws Exception {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +
    +			Field field = SocketTextStreamFunction.class.getDeclaredField("CONNECTION_RETRY_SLEEP");
    +			field.setAccessible(true);
    +			Field modifiersField = Field.class.getDeclaredField("modifiers");
    +			modifiersField.setAccessible(true);
    +			modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    +			field.set(null, 200);
    --- End diff --
    
    ...
    Sorry for that.
    I changed this in my new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36973430
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---
    @@ -43,6 +43,7 @@
     	private Socket socket;
     	private static final int CONNECTION_TIMEOUT_TIME = 0;
     	private static final int CONNECTION_RETRY_SLEEP = 1000;
    +	protected long retrys;
    --- End diff --
    
    You should also initialize the variable with 0 in the `open()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37748808
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(10000);
    +			assertTrue(source.socketSource.retries > lastRetry);
    +		};
    +		assertEquals(10, source.socketSource.retries);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(10, source.socketSource.retries);
    +	}
    +
    +	@Test
    +	public void testSocketSourceNeverRetry() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 0);
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		sleep(10000);
    --- End diff --
    
    Did you have to add the sleep here to make the test pass? 10 seconds is quite long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r36973254
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +		}
    +		sleep(10000);
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retrys < 10);
    --- End diff --
    
    You're "busy waiting" here which consumes a lot of CPU. You could check in regular intervals and sleep in between.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-136910004
  
    @mxm @StephanEwen 
    Hi, I just update one commit yesterday.
    And I found that a few PRs got the same trouble yesterday.
    Is there any issue in CI?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/992#discussion_r37864215
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.source;
    +
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction}.
    + */
    +public class SocketTextStreamFunctionTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +
    +	SourceFunction.SourceContext<String> ctx = new SourceFunction.SourceContext<String>() {
    +		public String result;
    +
    +		@Override
    +		public void collect(String element) {
    +			result = element;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return this.result;
    +		}
    +
    +		@Override
    +		public void collectWithTimestamp(String element, long timestamp) {
    +
    +		}
    +
    +		@Override
    +		public void emitWatermark(Watermark mark) {
    +
    +		}
    +
    +		@Override
    +		public Object getCheckpointLock() {
    +			return null;
    +		}
    +
    +		@Override
    +		public void close() {
    +
    +		}
    +	};
    +
    +	public SocketTextStreamFunctionTest() {
    +	}
    +
    +	class SocketSource extends Thread {
    +
    +		SocketTextStreamFunction socketSource;
    +
    +		public SocketSource(ServerSocket serverSo, int maxRetry) {
    +			this.socketSource =  new SocketTextStreamFunction(host, serverSo.getLocalPort(), '\n', maxRetry);
    +		}
    +
    +		public void run() {
    +			try {
    +				this.socketSource.open(new Configuration());
    +				this.socketSource.run(ctx);
    +			}catch(Exception e){
    +				error.set(e);
    +			}
    +		}
    +
    +		public void cancel(){
    +			this.socketSource.cancel();
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSourceRetryForever() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, -1);
    +		source.start();
    +
    +		int count = 0;
    +		Socket channel;
    +		while (count < 100) {
    +			channel = serverSo.accept();
    +			count++;
    +			channel.close();
    +			assertEquals(0, source.socketSource.retries);
    +		}
    +		source.cancel();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(100, count);
    +	}
    +
    +	@Test
    +	 public void testSocketSourceRetryTenTimes() throws Exception{
    +		error.set(null);
    +		ServerSocket serverSo = new ServerSocket(0);
    +		SocketSource source = new SocketSource(serverSo, 10);
    +
    +		assertEquals(0, source.socketSource.retries);
    +
    +		source.start();
    +
    +		Socket channel;
    +		channel = serverSo.accept();
    +		channel.close();
    +		serverSo.close();
    +		while(source.socketSource.retries < 10){
    +			long lastRetry = source.socketSource.retries;
    +			sleep(1000);
    --- End diff --
    
    Hi, I changed it in my new commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-130588652
  
    Thanks for your changes. I think we should use `read()` instead of `readLine()` because we are using a custom delimiter and not necessarily "\n" (newline symbol). The danger of reading an entire line is that the newline symbol might never arrive. So it might continue to read forever. And even if it manages to find a newline symbol, you have to truncate your input to find the custom delimiter. That's not very efficient. Can you change the code back to using the `read()` method? I think we had a misunderstanding.
    
    For you test case: It's not considered good practice to mix production and test code. You're doing that by introducing the `isRetrying` flag and exposing it. Alternatively, you have two options:
    
    1. Create a `ServerSocket` and pass its address to the `SocketTextStreamFunction`. Then control the connection to this socket and count how often the function reconnects (e.g. use the `accept()` method).
    2. Create your test in the same package as the `SocketTextStreamFunction` function (package is `org.apache.flink.streaming.api.functions.source`). Then you can access all field variables which are protected. So make your `retries` variable a protected field variable of the `SocketTextStreamFunction` class.
    
    I hope that this helps you. If not, feel free to ask more questions.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-132999197
  
    @HuangWHWHW, Max is currently on vacations. He'll be back next week. I'm sure that he'll get back to you then :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-137745160
  
    @mxm
    Ok, I make the CI rerun.
    Any new comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-136285250
  
    @mxm
    Hi, max.
    Any comment about my new changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-129571942
  
    This fix looks valid.
    
    Can it be included in an extended test for the socket function? Something that validates that the function properly tries to reconnect?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2490][FIX]Remove the retryForever check...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/992#issuecomment-136754340
  
    @HuangWHWHW Thank you for addressing my comments. Could you please squash your commits and force push to this branch again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---