You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by HuangWHWHW <gi...@git.apache.org> on 2015/08/17 11:34:06 UTC

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

GitHub user HuangWHWHW opened a pull request:

    https://github.com/apache/flink/pull/1030

    [FLINK-2536][streaming]add a re-connect for socket sink

    add a re-connect in function invoke() when it throws exception.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HuangWHWHW/flink FLINK-2536

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1030
    
----
commit 85d5bb50419d6b803a9fc966dd4f95fcd042e21c
Author: HuangWHWHW <40...@qq.com>
Date:   2015-08-17T09:32:04Z

    [FLINK-2536][streaming]add a re-connect for socket sink

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-141904245
  
    Hi, very sorry for bothering again.
    Since two weeks passed, do you have some time to review this PR recently?
    Will greatly appreciate it:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748000
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    --- End diff --
    
    Yes, I will take it outside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37745586
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    +						synchronized (simpleSink) {
    +							simpleSink.notifyAll();
    +						}
    +					}
    +
    +					//reopen socket server for sink access
    +					value = "";
    +					ServerSocket server = new ServerSocket(port);
    --- End diff --
    
    How do you make sure that the `SocketClientSink` has not been terminated before you open the server socket?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748179
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    +						synchronized (simpleSink) {
    +							simpleSink.notifyAll();
    --- End diff --
    
    In the SocketClientSink.java:121:
    synchronized (this) {
    						this.wait(CONNECTION_RETRY_SLEEP);
    					}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134538764
  
    @tillrohrmann
    BTW:How to make the CI rerun?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38410693
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					retries++;
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					LOG.error("Reconnect to socket server and send message failed. Caused by " +
    +							ee.toString() + ". Retry time(s):" + retries);
    +
    +					if (lock == null) {
    --- End diff --
    
    The idea was to speed up the proper termination of the `SocketClientSink` upon calling `closeConnection`. Otherwise one would have to wait a complete `CONNECTION_RETRY_SLEEP` cycle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-138836978
  
    Sorry, @HuangWHWHW, currently we're really busy. I'll try to review your PR once I find a free minute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-138843852
  
    No worries. We are simply overloaded right now. Many hard features under development, and many pull requests being opened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-138832801
  
    @StephanEwen 
    @tillrohrmann 
    Hallo?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37751617
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    +						synchronized (simpleSink) {
    +							simpleSink.notifyAll();
    --- End diff --
    
    Ah I see. I think we should not lock on the instance itself. It would be better to not expose this to the outside world.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37750694
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -88,6 +136,7 @@ public void invoke(IN value) {
     	 */
     	private void closeConnection(){
     		try {
    +			isRunning = false;
    --- End diff --
    
    You should wake-up the waiting thread here. Otherwise waiting on a lock does not make any sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38410820
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					retries++;
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					LOG.error("Reconnect to socket server and send message failed. Caused by " +
    +							ee.toString() + ". Retry time(s):" + retries);
    +
    +					if (lock == null) {
    +						lock = new Object();
    +					}
    +
    +					try {
    +						synchronized (lock) {
    +							lock.wait(CONNECTION_RETRY_SLEEP);
    +						}
    +					} catch(InterruptedException eee) {
    +						LOG.error(eee.toString());
    --- End diff --
    
    Ok, I`ll remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38410780
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					retries++;
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					LOG.error("Reconnect to socket server and send message failed. Caused by " +
    +							ee.toString() + ". Retry time(s):" + retries);
    +
    +					if (lock == null) {
    --- End diff --
    
    Ah, this is a comment of @tillrohrmann.
    See this:
    ![image](https://cloud.githubusercontent.com/assets/13193847/9603561/04dad2b4-50e4-11e5-96e3-143d4d5084bf.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-139104928
  
    I removed the toString() method and change the lock to `private final SerializableObject lock`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37745169
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    --- End diff --
    
    What is `t1` used for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37744540
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    --- End diff --
    
    I think it's good to set the error to `null` again. Otherwise all subsequent tests will fail with the same error message which is, however, completely unrelated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37563424
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    +
    +					Thread.sleep(CONNECTION_RETRY_SLEEP);
    --- End diff --
    
    We should definitely log the exception so that the user can understand why the sink had problems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37745343
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    +						synchronized (simpleSink) {
    +							simpleSink.notifyAll();
    --- End diff --
    
    Where is the corresponding `wait`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37744939
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    --- End diff --
    
    Adding a short description of the test case as a comment would be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748875
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    --- End diff --
    
    But it's never used within the `run` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37562991
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    +
    +					Thread.sleep(CONNECTION_RETRY_SLEEP);
    +					continue;
    --- End diff --
    
    Why continue here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-135630392
  
    The method notifyAll() maybe get a bug in my test file that sometimes it will be called before the wait().
    So the method wait() will get stuck.
    Then I change this to a sub-thread that using join() to avoid it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37744808
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    --- End diff --
    
    Why is this exception good? From where does it originate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1030


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-142173138
  
    @StephanEwen 
    Hi, I'm very sorry for the poor Java style of mine.
    And many thanks for your rework.I did a full review about your new fixes and get the points.
    I'll be more careful next time!
    And also thanks for the book.I'm doing more studies from now on.
    Generally, thanks for the time very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37824613
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    --- End diff --
    
    Actually, the invoke will success during the first call.
    I am not very known about the tcp.
    However, if you want to let the sink reconnect, you need to call the invoke twice since the sink will try to reconnect in the second time instead the first time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37745480
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-132508921
  
    Add tests for retry 10 times and 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37563275
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    --- End diff --
    
    I think we should at least log the exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37751723
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    +						synchronized (simpleSink) {
    +							simpleSink.notifyAll();
    +						}
    +					}
    +
    +					//reopen socket server for sink access
    +					value = "";
    +					ServerSocket server = new ServerSocket(port);
    --- End diff --
    
    This might be a problem if this one thread is really slow. Better would be an explicit synchronization mechanism.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37752742
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +85,49 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			Log.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					Log.error("Reconnect to socket server and send message failed. Caused by " +
    +								ee.toString() + ". Retry time(s):" + retries);
    +					synchronized (this) {
    +						this.wait(CONNECTION_RETRY_SLEEP);
    --- End diff --
    
    This can throw an `InterruptedException`. Should be treated by checking the termination criterion for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134822139
  
    @tillrohrmann 
    Hi, I fix the conflict and get the CI rerun.
    Would you please to take a look about my new changes?
    Whether there will be some new comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134196700
  
    Hi,
    Thank you.
    I`ll update a new fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37562745
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    --- End diff --
    
    We usually add whitespaces between keywords.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134175239
  
    Hi @HuangWHWHW, I had some comments concerning the test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-137747552
  
    @StephanEwen 
    @tillrohrmann
    Hi,
    I get the CI to rerun.
    Any new comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-142014348
  
    I reworked this quite heavily during merging. There were a lot of issues that were against good Java style:
      - Variables in the classes, rather than in methods
      - The way references to threads were obtained
      - Defining clear parameter checks and exceptions
      - Handling InterruptedExceptions
      - polling versus clear conditions when state can be checked
    
    You can have a look at the code after my fixes, to see these issues in context.
    
    I would suggest to get a Java book (like "Effective Java", that is a good one) and take this as a guideline for future work. This pull request took more than 70 comments and still needed quite some rework (not for Flink-specific issues, but all of it general Java style/efficiency/correctness). I am afraid we cannot do that for every pull request, it would be completely overwhelming...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37744832
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    --- End diff --
    
    If this assertion is false, then your test won't fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37744849
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37618025
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
     		assertEquals("Connected", this.access);
     		assertEquals("testSocketSinkInvoke", value);
     	}
    +
    +	public Thread t;
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					Thread.sleep(10000);
    --- End diff --
    
    To be honest, I'm not a big fan of `sleep` based synchronization. Too often these kind of tests have failed on Travis. Usually if you use sleeps, the interval is either to short to allow different interleavings if you have bad luck or they are too long which makes the test slow. Therefore, I'd propose a simple wait object on which you wait from within the thread. Once you've closed the server socket, you can trigger the `notifyAll` method on this wait object to let the thread continue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37751549
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    --- End diff --
    
    But the invoke will fail already during the first call, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134573477
  
    It should be re-triggered every time you push something new to your branch.
    For your local CI you should be able to manually restart a build by going
    to travis and click the restart button.
    
    On Tue, Aug 25, 2015 at 11:29 AM, HuangWHWHW <no...@github.com>
    wrote:
    
    > @tillrohrmann <https://github.com/tillrohrmann>
    > BTW:How to make the CI rerun?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/1030#issuecomment-134538764>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-142272588
  
    @HuangWHWHW No problem, we all learn all the time.
    It would only help to review and merge pull requests if the style follows more the Java best practices. It is something that you will learn fast, I am sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134577603
  
    @tillrohrmann 
    Ok, I will do a update in my branch.
    But I cannot go to travis since it is blocked in China.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37745469
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    --- End diff --
    
    Why do you have to call the `simpleSink` twice? Isn't the socket already closed before the first invoke call comes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37599157
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
     		assertEquals("Connected", this.access);
     		assertEquals("testSocketSinkInvoke", value);
     	}
    +
    +	public Thread t;
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					Thread.sleep(10000);
    --- End diff --
    
    This is waiting for the socket server close.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37563322
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    --- End diff --
    
    Do we really want to catch all exceptions here? Maybe `IOException` is enough?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37563984
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    +
    +					Thread.sleep(CONNECTION_RETRY_SLEEP);
    --- End diff --
    
    Maybe it's also better to use a `synchronized(lock) { lock.wait(CONNECTION_RETRY_SLEEP) }` here. That way we can call in the `closeConnection` method the `lock.notifyAll` method to directly wake all sleeping threads up. This will speed up the shutdown procedure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37744884
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    --- End diff --
    
    Same here as with `testSocketSinkNoRetry`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38408571
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					retries++;
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					LOG.error("Reconnect to socket server and send message failed. Caused by " +
    +							ee.toString() + ". Retry time(s):" + retries);
    +
    +					if (lock == null) {
    --- End diff --
    
    The `lock` part can probably be replaced by a simple `Thread.sleep(...)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-136285097
  
    @tillrohrmann 
    Hi,very sorry for disturbing.
    I have changed the PR as your comments and passed the CI.
    It will be thankful if you can take a look.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37824211
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    --- End diff --
    
    I think t1 is not necessary.
    I will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37643848
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
     		assertEquals("Connected", this.access);
     		assertEquals("testSocketSinkInvoke", value);
     	}
    +
    +	public Thread t;
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					Thread.sleep(10000);
    --- End diff --
    
    Yes, I understand you.
    I`ll take a change :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37564115
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -125,4 +121,102 @@ public void testSocketSink() throws Exception{
     		assertEquals("Connected", this.access);
     		assertEquals("testSocketSinkInvoke", value);
     	}
    +
    +	public Thread t;
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					Thread.sleep(10000);
    --- End diff --
    
    Why do we have to sleep here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r39020317
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					retries++;
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					LOG.error("Reconnect to socket server and send message failed. Caused by " +
    +							ee.toString() + ". Retry time(s):" + retries);
    +
    +					if (lock == null) {
    --- End diff --
    
    I see, that is good. For that to be safe, though, the lock should be initialized with the class.
    
    In general, it is good practice to only use final references as locks. If the class needs to be serializable, use the `SerializableObject`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-136702507
  
    @StephanEwen 
    Hi, I take changes for your comments but the synchronized (lock) in SocketClientSink.java.
    Need I change this to Thread.sleep()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-139101747
  
    Sorry for careless.
    I forget to change the git global user.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-133238105
  
    " Could we also add a test case where we test that the SocketClientSink can reconnect against a newly opened socket after it has been closed? This would be great."
    
    Good idea!
    I will try to do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38408542
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    --- End diff --
    
    You can log the exception simpler like this:
    ```
    LOG.error("Cannot send message " + value + " to socket server at " + hostName + ":" + port + ". Trying to reconnect.", e);
    `´`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748764
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    --- End diff --
    
    Wouldn't it make sense to explicitly check for it then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37750633
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +85,49 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			Log.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					Log.error("Reconnect to socket server and send message failed. Caused by " +
    +								ee.toString() + ". Retry time(s):" + retries);
    +					synchronized (this) {
    --- End diff --
    
    Locking on `this` is usually not recommended because outside code can influence your behaviour here. Thus, use a private lock field here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37599021
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    +
    +					Thread.sleep(CONNECTION_RETRY_SLEEP);
    +					continue;
    --- End diff --
    
    Sorry about a careless.
    Just remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134107315
  
    @tillrohrmann 
    Hi,
    I take a new fix and add a test for retry success.
    Would you please to take a look?
    Thank you.
    
    BTW:Why does not the CI rerun??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r38408757
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +90,56 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			LOG.error("Cannot send message " + value.toString() +
    +					" to socket server at " + hostName + ":" + port + ". Caused by " + e.toString() +
    +					". Trying to reconnect.");
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					retries++;
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				} catch(IOException ee) {
    +					LOG.error("Reconnect to socket server and send message failed. Caused by " +
    +							ee.toString() + ". Retry time(s):" + retries);
    +
    +					if (lock == null) {
    +						lock = new Object();
    +					}
    +
    +					try {
    +						synchronized (lock) {
    +							lock.wait(CONNECTION_RETRY_SLEEP);
    +						}
    +					} catch(InterruptedException eee) {
    +						LOG.error(eee.toString());
    --- End diff --
    
    I think there is no need to log this. Interrupting the thread usually means that it has been shut down.
    
    If you want to log exceptions, log them via ("message", exeption), here `LOG.error("Reconnect delay interrupted", e);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37598999
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    --- End diff --
    
    Yes, it depends on you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37747966
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    --- End diff --
    
    Since the socket server never reopen.
    The reconnecting will throw a exception when finished to retry.
    Originate:(SocketClientSink.java:127)
    if (!success) {
    				throw new RuntimeException("Cannot send message " + value.toString() +
    						" to socket server at " + hostName + ":" + port, e);
    			}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-138842285
  
    @tillrohrmann 
    Ah,sorry for bothering.
    It doesn't matter.
    Just I thought I did something wrong in the community.
    :-D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37752541
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    --- End diff --
    
    You're accessing shared state here from multiple threads. In this case it might happen that you don't see the updates to `simpleSink.retries` until it's too late and it has already reached the maximum number of retries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-133111629
  
    Hi @HuangWHWHW, I had some minor comments. Could we also add a test case where we test that the `SocketClientSink` can reconnect against a newly opened socket after it has been closed? This would be great.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37563493
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -16,22 +16,18 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.streaming.api.functions;
    +package org.apache.flink.streaming.api.functions.sink;
     
    -import java.io.IOException;
    +import java.io.*;
    --- End diff --
    
    We don't use star imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748110
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    --- End diff --
    
    The 't1' is using to join the Thread.
    As a result, I can assert the msg sink writed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-138841521
  
    I think this looks good, except for the comment with the final variable for the lock.
    
    One more comment: When concatenating strings, avoid constructs like `" value=" + value.toString()`. Rather do `"value=" + value`. That is safe against null pointers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-134513103
  
    @tillrohrmann
    Hi, I take a new fix.
    But this:
    ![image](https://cloud.githubusercontent.com/assets/13193847/9461264/eccf96cc-4b3f-11e5-8d08-19ecd83eff7c.png)
    I have no good idea to care both the sink need to retry and should not finished retry when I reopen the socket server.
    So, I change the test "testSocketSinkRetryAccess” from retry ten times to retry forever since this will never finished retry until the method closeConnection() is called or the reconnect is success.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748350
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    --- End diff --
    
    Due to the first time, when server closed, server is waiting for a fin massage(state:FIN_WAIT_2).
    First calling the invoke, server will really close.
    Then calling second times, sink will retry as the server is closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37823131
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    --- End diff --
    
    It is right.
    I get you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-140079694
  
    Sorry for misunderstading.
    I have changed the `LOG.error(e)` to `LOG.error(e.getMessage())` now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37617867
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java ---
    @@ -73,13 +84,44 @@ public void intializeConnection() {
     	 *			The incoming data
     	 */
     	@Override
    -	public void invoke(IN value) {
    +	public void invoke(IN value) throws Exception {
     		byte[] msg = schema.serialize(value);
     		try {
     			dataOutputStream.write(msg);
     		} catch (IOException e) {
    -			throw new RuntimeException("Cannot send message " + value.toString() +
    -					" to socket server at " + hostName + ":" + port, e);
    +			retries = 0;
    +			boolean success = false;
    +			while ((retries < maxRetry || retryForever) && !success && isRunning){
    +				try {
    +
    +					if (dataOutputStream != null) {
    +						dataOutputStream.close();
    +					}
    +
    +					if (client != null && !client.isClosed()) {
    +						client.close();
    +					}
    +
    +					if (!retryForever){
    +						retries++;
    +					}
    +
    +					client = new Socket(hostName, port);
    +					dataOutputStream = new DataOutputStream(client.getOutputStream());
    +					dataOutputStream.write(msg);
    +					success = true;
    +
    +				}catch(Exception ee){
    --- End diff --
    
    Well, it actually depends on the exceptions for which we want to restart. If I'm not mistaken, then `new Socket()` and `dataOutputStream.write` only throw `IOExceptions`. Thus, we should change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1030#discussion_r37748510
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.sink;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String value;
    +
    +	public Thread t;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +		value = "";
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +				} catch (Exception e){
    +					error.set(e);
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		BufferedReader rdr = new BufferedReader(new InputStreamReader(sk
    +				.getInputStream()));
    +		value = rdr.readLine();
    +
    +		t.join();
    +		server.close();
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals("testSocketSinkInvoke", value);
    +	}
    +
    +	@Test
    +	public void testSocketSinkNoRetry() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 0);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(0, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryTenTimes() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t){
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					//socket is closed then test "retry"
    +					simpleSink.invoke("testSocketSinkInvoke");
    +					simpleSink.close();
    +					assertEquals(10, simpleSink.retries);
    +				} catch (Exception e) {
    +					//This Exception is good since the socket server is never reopen.
    +				}
    +			}
    +		}).start();
    +
    +		Socket sk = server.accept();
    +		sk.setKeepAlive(false);
    +		sk.close();
    +		server.close();
    +
    +		synchronized (t) {
    +			t.notifyAll();
    +		}
    +
    +		t.join();
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSinkRetryAccess() throws Exception{
    +		ServerSocket server = new ServerSocket(0);
    +		port = server.getLocalPort();
    +
    +		new Thread(new Runnable() {
    +
    +			Thread t1;
    +			SocketClientSink<String> simpleSink;
    +
    +			@Override
    +			public void run() {
    +				t = Thread.currentThread();
    +				SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +					@Override
    +					public byte[] serialize(String element) {
    +						return element.getBytes();
    +					}
    +				};
    +
    +				try {
    +					simpleSink = new SocketClientSink<String>(host, port, simpleSchema, 10);
    +					simpleSink.open(new Configuration());
    +
    +					synchronized (t) {
    +						//wating for server to close
    +						t.wait();
    +					}
    +
    +					//firstly send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
    +					simpleSink.invoke("testSocketSinkInvoke");
    +
    +					new Thread(new Runnable() {
    +
    +						@Override
    +						public void run() {
    +							t1 = Thread.currentThread();
    +							try {
    +								//socket is closed then test "retry"
    +								simpleSink.invoke("testSocketSinkInvoke");
    +								simpleSink.close();
    +							} catch (Exception e) {
    +								error.set(e);
    +							}
    +						}
    +
    +					}).start();
    +
    +					//set a new server to let the retry success.
    +					while (simpleSink.retries == 0){
    +						synchronized (simpleSink) {
    +							simpleSink.notifyAll();
    +						}
    +					}
    +
    +					//reopen socket server for sink access
    +					value = "";
    +					ServerSocket server = new ServerSocket(port);
    --- End diff --
    
    This may be a issue.
    I don`t get a perfect idea.
    So, this is just faster than retry 10 times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2536][streaming]add a re-connect for so...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1030#issuecomment-141952198
  
    Looks good now, will merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---