You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by HuangWHWHW <gi...@git.apache.org> on 2015/08/04 05:32:22 UTC

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

GitHub user HuangWHWHW opened a pull request:

    https://github.com/apache/flink/pull/977

    [FLINK-2477][Add]Add tests for SocketClientSink

    Add tests for SocketClientSink.
    1.Test SocketClientSink open function:
    setup a socket server --> using SocketClientSink to connect the server.
    2.Test SocketClientSink open and invoke function:
    setup a socket server --> using SocketClientSink to connect the server and send a message.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HuangWHWHW/flink FLINK-2477

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #977
    
----
commit 068ff28fb248722523914f385ebb5b1b1d144583
Author: HuangWHWHW <40...@qq.com>
Date:   2015-08-04T03:22:10Z

    [FLINK-2477][Add]Add tests for SocketClientSink

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-131144157
  
    I think this is good, minus two small comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127539612
  
    Ok, I`ll remember next time since the new CI has been running already.
    It`s very kind of you.
    In addition, I found that the test cases of flink-streaming-api are few.
    Need I do some tests for it since it`s helpful to find bugs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36671921
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    --- End diff --
    
    No need to manually try and find a free port. You can simply do a `new ServerSocket(0)` and it finds a free port.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36672245
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    +				try {
    +					this.server = new ServerSocket(port);
    +					break;
    +				} catch (Exception e) {
    +					--port;
    +					if (port > 0) {
    +						continue;
    +					}
    +					else{
    +						e.printStackTrace();
    +					}
    +				}
    +			}
    +		}
    +
    +		public void run() {
    +			System.out.println("Listenning...");
    +			try {
    +				sk = server.accept();
    +				access = "Connected";
    +				th = new ServerThread(sk);
    +				th.start();
    +			} catch (Exception e) {
    +				e.printStackTrace();
    +			}
    +		}
    +
    +		class ServerThread extends Thread {
    +
    +			Socket sk = null;
    +
    +			public ServerThread(Socket sk) {
    +				this.sk = sk;
    +			}
    +
    +			public void run() {
    +				try {
    +					access = "Invoked";
    +					wtr = new PrintWriter(sk.getOutputStream());
    +					rdr = new BufferedReader(new InputStreamReader(sk
    +							.getInputStream()));
    +					String line = rdr.readLine();
    +					System.out.println("Info from clients: " + line);
    +					wtr.println("Server received info: " + line + "'\n");
    +					wtr.flush();
    +					System.out.println("Return to client!");
    +				} catch (IOException e) {
    +					e.printStackTrace();
    +				}
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSink(){
    +		SocketServer server = new SocketServer(port);
    +		server.start();
    +
    +		SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +			@Override
    +			public byte[] serialize(String element) {
    +				return new byte[0];
    +			}
    +		};
    +
    +		SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema);
    +		simpleSink.open(new Configuration());
    +		simpleSink.invoke("testSocketSinkInvoke");
    +
    +		try {
    +			server.join();
    +			sleep(1000);
    --- End diff --
    
    No need to sleep after join. Join waits for the thread to terminate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-129785617
  
    @StephanEwen 
    Hi,
    I fixed all of your reviews but the problem about one thread (for server and connection).
    I just plan to do some complex test late.
    So, if you think it isn`t necessary I`ll change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127536235
  
    Don't worry, everyone was a novice once. :smile:
    
    You can do it either way. Either close this PR and we'll look at the other one or close the other one and update this one here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-129570014
  
    This test actually does not fail if the sink does not work properly. It contains no assertions, but only prints the status.
    
    Here are a few points that we try to follow:
      - The test should assert that the correct result was produced.
      - All exceptions should cause the test to fail. That means either exceptions need to propagate out of the test method, or `Assert.fail()` needs to be called in the catch block. Be aware that failures in spawned threads need to be forwarded to the main thread, otherwise the unit test framework will not work.
      - Tests should not print anything, otherwise the terminal will become unreadable during testing. Output can be made with the logging framework.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127832555
  
    @rmetzger 
    Hi, Thank you very much.
     So,any progress lately?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/977


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-128131074
  
    Hi,
    unfortunately the PR still needs a bit of work: The Class documentation does not match the class. It would be good if the ports where not statically set since this might interfere with concurrently running jobs. The code could benefit from a bit of commenting. For example, I don't know why we need to tests, for invoke and without invoke.
    
    Maybe someone else can take a look? I'm on vacation right now and don't really have a lot of time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-132100631
  
    @StephanEwen 
    Hi, I`m really very sorry to bother you again.
    I just wonder whether this can be merged or I need to do some fix more or close this PR?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127606171
  
    For the Travis problem, I would write an email to support@travis-ci.com. They are very friendly and helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127533577
  
    Hi,
    you don't need to create a new Pull Request. You can also just push your changes to your old branch (HuangWHWHW:FLINK-2477 in this case) and then the Pull Request in Github will be updated. Git will complain but you can force it to push by using:
    
    ```
    git push -f FLINK-2477 HuangWHWHW
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127532073
  
    @aljoscha 
    Thank you for the help and the CI info.
    There is some bug in my tests before and I fixed it.
    I`ve made a new PR already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36672173
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    +				try {
    +					this.server = new ServerSocket(port);
    +					break;
    +				} catch (Exception e) {
    +					--port;
    +					if (port > 0) {
    +						continue;
    +					}
    +					else{
    +						e.printStackTrace();
    +					}
    +				}
    +			}
    +		}
    +
    +		public void run() {
    +			System.out.println("Listenning...");
    +			try {
    +				sk = server.accept();
    --- End diff --
    
    It is probably okay to use only one thread (for server and connection), because you are not handling multiple concurrent connections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-128208126
  
    @rkh 
    Yes, I asked the travis support too.
    They told me that it will be usable within next week hopefully.
    Thanks for info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36714718
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    --- End diff --
    
    Sorry, you may ignore this reply.
    I`ve got the picture.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127541271
  
    You're welcome. And yes the streaming API is quite a bit newer than the other code so it is not that well covered with tests. If you want to, you can open a Jira Issue (https://issues.apache.org/jira/browse/FLINK) about the tests, then the community can discuss how we should approach improving the test coverage. That would be very nice. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127508946
  
    Hi, I cannot see the CI Details.
    Does anyone help?
    Problem like this:
    
    ![](https://cloud.githubusercontent.com/assets/13193847/9055495/2e21b840-3abe-11e5-9eb9-b4b2f5ef5fd2.png)
    
    
    
    And tests I add is access in my local environment:
    ![image](https://cloud.githubusercontent.com/assets/13193847/9055559/d30011d6-3abe-11e5-9d27-134d39df21f0.png)
    
    
    Would you please rerun the CI again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127534169
  
    Sorry, I`m still a novice in using GIT.
    So, what about now?
    Need I cancel the new PR and push changes here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r37086314
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String access;
    +	private String value;
    +	public SocketServer.ServerThread th;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server;
    +		private Socket sk;
    +		private BufferedReader rdr;
    +
    +		private SocketServer() {
    +			try {
    +				this.server = new ServerSocket(0);
    +				port = server.getLocalPort();
    +			} catch (Exception e) {
    +				error.set(e);
    +			}
    +		}
    +
    +		public void run() {
    +			try {
    +				sk = server.accept();
    +				access = "Connected";
    +				th = new ServerThread(sk);
    +				th.start();
    +			} catch (Exception e) {
    +				error.set(e);
    +			}
    +		}
    +
    +		class ServerThread extends Thread {
    +			Socket sk;
    +
    +			public ServerThread(Socket sk) {
    +				this.sk = sk;
    +			}
    +
    +			public void run() {
    +				try {
    +					rdr = new BufferedReader(new InputStreamReader(sk
    +							.getInputStream()));
    +					value = rdr.readLine();
    +				} catch (IOException e) {
    +					error.set(e);
    +				}
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +
    +		SocketServer server = new SocketServer();
    +		server.start();
    +
    +		SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +			@Override
    +			public byte[] serialize(String element) {
    +				return element.getBytes();
    +			}
    +		};
    +
    +		SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema);
    +		simpleSink.open(new Configuration());
    +		simpleSink.invoke("testSocketSinkInvoke");
    +		simpleSink.close();
    +		try {
    +			server.join();
    +			th.join();
    +		}
    +		catch (Exception e){
    +			Assert.fail(e.getMessage());
    +		}
    +
    +		if (error.get() != null) {
    +			Throwable t = error.get();
    +			t.printStackTrace();
    +			fail("Error in spawned thread: " + t.getMessage());
    +		}
    +
    +		assertEquals(this.access, "Connected");
    --- End diff --
    
    For correct display in error cases, the order should be `assertEquals(expected, actual)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-129854454
  
    @StephanEwen 
    Hi, I push a new fix about the exception.
    Would you please to check whether it`s correct?
    And there is another PR(https://github.com/apache/flink/pull/991) that the CI is failed.
    Since I couldn`t see the CI detail yet, could you do me a favor having a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127569323
  
    Ok, I opened the jira Issue FLINK-2480.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-132118720
  
    Looks good and CI passes.
    
    Will merge this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-131473233
  
    Ok, I`ve done it.
    Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r37086274
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.net.ServerSocket;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
    +	private final String host = "127.0.0.1";
    +	private int port;
    +	private String access;
    +	private String value;
    +	public SocketServer.ServerThread th;
    +
    +	public SocketClientSinkTest() {
    +	}
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server;
    +		private Socket sk;
    +		private BufferedReader rdr;
    +
    +		private SocketServer() {
    +			try {
    +				this.server = new ServerSocket(0);
    +				port = server.getLocalPort();
    +			} catch (Exception e) {
    +				error.set(e);
    +			}
    +		}
    +
    +		public void run() {
    +			try {
    +				sk = server.accept();
    +				access = "Connected";
    +				th = new ServerThread(sk);
    +				th.start();
    +			} catch (Exception e) {
    +				error.set(e);
    +			}
    +		}
    +
    +		class ServerThread extends Thread {
    +			Socket sk;
    +
    +			public ServerThread(Socket sk) {
    +				this.sk = sk;
    +			}
    +
    +			public void run() {
    +				try {
    +					rdr = new BufferedReader(new InputStreamReader(sk
    +							.getInputStream()));
    +					value = rdr.readLine();
    +				} catch (IOException e) {
    +					error.set(e);
    +				}
    +			}
    +		}
    +	}
    +
    +	@Test
    +	public void testSocketSink() throws Exception{
    +
    +		SocketServer server = new SocketServer();
    +		server.start();
    +
    +		SerializationSchema<String, byte[]> simpleSchema = new SerializationSchema<String, byte[]>() {
    +			@Override
    +			public byte[] serialize(String element) {
    +				return element.getBytes();
    +			}
    +		};
    +
    +		SocketClientSink<String> simpleSink = new SocketClientSink<String>(host, port, simpleSchema);
    +		simpleSink.open(new Configuration());
    +		simpleSink.invoke("testSocketSinkInvoke");
    +		simpleSink.close();
    +		try {
    +			server.join();
    +			th.join();
    +		}
    +		catch (Exception e){
    +			Assert.fail(e.getMessage());
    --- End diff --
    
    You don't need to catch the exception here, you can let it propagate out of the test.
    It you choose to catch it, you should print it, so the log reveals problems on test failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-129829151
  
    Looks much better. Two more comments:
    
    1. It would be good to print the exception in addition to failing the test, because otherwise the tests only fails and gives no indication why. A typical pattern is:
    ```java
    try {
      // some stuff
    } catch (Exception e) {
      e.printStackTrace();
      Assert.fail(e.getMessage());
    }
    ```
    Here, the test prints nothing when working properly, but prints the error when it fails.
    
    2. `Assert.fail()` does not work when used in spawned threads. The reason is that JUnit communicates the failure with a special `AssertionFailedException`, which needs to reach the invoking framework. That does not happen if the `fail()` method is called in a spawned thread.
    
    Here is how you can do it. It is a bit clumsy, because you need to forward the exception to the main thread, but it works well:
    ```java
    final AtomicReference<Throwable> error = new AtomicReference<>();
    Thread thread = new Thread("server thread") {
        @Override
        public void run() {
            try {
                doStuff();
            }
            catch (Throwable t) {
                ref.set(t);
            }
        }
    };
    thread.start();
    
    // do some test logic
    
    thread.join();
    if (error.get() != null) {
        Throwable t = error.get();
        t.printStackTrace();
        fail("Error in spawned thread: " + t.getMessage());
    }
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127982738
  
    Hi, the CI has passed.
    Can it be merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-128200510
  
    Hi,
    don`t worry and have a nice vacation!
    I`ll fix on the Class documentation and the static ports.
    As for the tests, I was considering the case coverage.
    Everyone could use SocketSinkFunction for invoke and without invoke as the function is public.
    And I would like to combine tests to one test if you think the case coverage is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36672088
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    +				try {
    +					this.server = new ServerSocket(port);
    +					break;
    +				} catch (Exception e) {
    +					--port;
    +					if (port > 0) {
    +						continue;
    +					}
    +					else{
    +						e.printStackTrace();
    +					}
    +				}
    +			}
    +		}
    +
    +		public void run() {
    +			System.out.println("Listenning...");
    +			try {
    +				sk = server.accept();
    +				access = "Connected";
    +				th = new ServerThread(sk);
    +				th.start();
    +			} catch (Exception e) {
    +				e.printStackTrace();
    --- End diff --
    
    An exception here should cause the test to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36714696
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    +				try {
    +					this.server = new ServerSocket(port);
    +					break;
    +				} catch (Exception e) {
    +					--port;
    +					if (port > 0) {
    +						continue;
    +					}
    +					else{
    +						e.printStackTrace();
    +					}
    +				}
    +			}
    +		}
    +
    +		public void run() {
    +			System.out.println("Listenning...");
    +			try {
    +				sk = server.accept();
    --- End diff --
    
    Sorry, you may ignore this reply.
    I`ve got the picture.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36671992
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    --- End diff --
    
    Null initialization is actually redundant and causes overhead. That is not a problem here (it is a test), but it is good to avoid them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-131001646
  
    @StephanEwen 
    Hi, I did some changes few days ago.
    Could you take a look again?
    Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36707683
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    --- End diff --
    
    Hi, if the ServerSocket(0) found a free port how should I get this port for SocketClientSink as this class request a real port.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/977#discussion_r36707458
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/SocketClientSinkTest.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import java.io.IOException;
    +import java.net.Socket;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +
    +import org.junit.Test;
    +
    +import static java.lang.Thread.sleep;
    +import static org.junit.Assert.*;
    +
    +import java.io.BufferedReader;
    +import java.io.InputStreamReader;
    +import java.io.PrintWriter;
    +import java.net.ServerSocket;
    +
    +/**
    + * Tests for the {@link org.apache.flink.streaming.api.functions.sink.SocketClientSink}.
    + */
    +public class SocketClientSinkTest{
    +
    +	private final String host = "127.0.0.1";
    +	private int port = 9999;
    +	private String access;
    +	public SocketServer.ServerThread th = null;
    +
    +	class SocketServer extends Thread {
    +
    +		private ServerSocket server = null;
    +		private Socket sk = null;
    +		private BufferedReader rdr = null;
    +		private PrintWriter wtr = null;
    +
    +		private SocketServer(int port) {
    +			while (port > 0) {
    +				try {
    +					this.server = new ServerSocket(port);
    +					break;
    +				} catch (Exception e) {
    +					--port;
    +					if (port > 0) {
    +						continue;
    +					}
    +					else{
    +						e.printStackTrace();
    +					}
    +				}
    +			}
    +		}
    +
    +		public void run() {
    +			System.out.println("Listenning...");
    +			try {
    +				sk = server.accept();
    --- End diff --
    
    This is prepare to subsequent complex test.
    And I would like to remove this thread if you consider it`s not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by HuangWHWHW <gi...@git.apache.org>.
Github user HuangWHWHW commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127508217
  
    Hi, I cannot see the CI Details.
    Does anyone help?
    Problem like this:
    ![image](https://cloud.githubusercontent.com/assets/13193847/9055495/2e21b840-3abe-11e5-9eb9-b4b2f5ef5fd2.png)
    
    
    
    And tests I add is access in my local environment:
    ![image](https://cloud.githubusercontent.com/assets/13193847/9055495/2e21b840-3abe-11e5-9eb9-b4b2f5ef5fd2.png)
    
    Would you please rerun the CI again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by rkh <gi...@git.apache.org>.
Github user rkh commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-128011316
  
    FYI the problems @HuangWHWHW is experiencing with Travis CI stem from our CDN being blocked in China. We're working on a fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/977#issuecomment-127522856
  
    Hi,
    you can rerun the tests on Travis by changing your commit slightly and re-pushing it to your branch, for example using:
    ```bash
    git commit --amend
    git push -f <your branch> <your repository>
    ```
    
    The "Do you have a question?" seems to be a Travis problem. Maybe someone else has seen it already?   The relevant output from the Travis log seems to be this:
    
    ```
    Running org.apache.flink.streaming.api.functions.SocketClientSinkTest
    Listenning...
    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.957 sec - in org.apache.flink.streaming.api.PartitionerTest
    Running org.apache.flink.streaming.api.functions.FromElementsFunctionTest
    Info from clients: null
    Return to client!
    Listenning...
    Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.035 sec <<< FAILURE! - in org.apache.flink.streaming.api.functions.SocketClientSinkTest
    testSocketSinkOpen(org.apache.flink.streaming.api.functions.SocketClientSinkTest)  Time elapsed: 0.016 sec  <<< FAILURE!
    org.junit.ComparisonFailure: expected:<[Invok]ed> but was:<[Connect]ed>
    	at org.junit.Assert.assertEquals(Assert.java:115)
    	at org.junit.Assert.assertEquals(Assert.java:144)
    	at org.apache.flink.streaming.api.functions.SocketClientSinkTest.testSocketSinkOpen(SocketClientSinkTest.java:126)
    ```
    
    Let me know if you need any help. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---