You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kytay <ka...@gmail.com> on 2014/07/11 04:41:20 UTC

Streaming. Cannot get socketTextStream to receive anything.

Hi

I am learning spark streaming, and is trying out the JavaNetworkCount
example.


#1 - This is the code I wrote
JavaStreamingContext sctx = new JavaStreamingContext("local", appName, new
Duration(5000));
JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
9999);
        
JavaDStream<String> words =lines.flatMap(
		new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String arg0) throws Exception {
				
				System.out.println("Print text:" + arg0);
				return Arrays.asList(arg0.split(" "));
			}
		});

#2  - This is the socketCode I am using
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class TestTcpServer {

	public static void main(String argv[]) throws Exception       
	{         
		String clientSentence;          
		String capitalizedSentence;          
		ServerSocket welcomeSocket = new ServerSocket(9999);   
		
		int i = 0;
		
		while(true)          
		{             
			Socket connectionSocket = welcomeSocket.accept();             
			BufferedReader inFromClient = new BufferedReader(
					new InputStreamReader(connectionSocket.getInputStream())
					);             
			DataOutputStream outToClient = new
DataOutputStream(connectionSocket.getOutputStream());
			
			while(true)
			{
				String sendingStr = "Sending... data... " + i;
				outToClient.writeBytes(sendingStr);
				System.out.println(sendingStr);			
				i++;	
				Thread.sleep(3000);
			}         
		}       
	}
}

What I am trying to do is to get the JavaNetworkCount in #1 to start
printing all the text I am receiving. But so far I failed to achieve that.

I have been using  Hercules Setup
<http://www.hw-group.com/products/hercules/details_en.html>   to simulate as
a TCP server, as well as a simple serversocket code in #2...
But I am not seeing any text being printed on the console.

Is public Iterable<String> call(String arg0) throws Exception being called
every 5 secs?

The console log is in  http://pastebin.com/THzdzGhg
<http://pastebin.com/THzdzGhg>  



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by Tathagata Das <ta...@gmail.com>.
You will have to define your own stream-to-iterator function and use the
socketStream. The function should return custom delimited object as bytes
are continuously coming in. When data is insufficient, the function should
block.

TD
On Jul 23, 2014 6:52 PM, "kytay" <ka...@gmail.com> wrote:

> Hi TD
>
> You are right, I did not include "\n" to delimit the string flushed. That's
> the reason.
>
> Is there a way for me to define the delimiter? Like SOH or ETX instead of
> "\n"
>
> Regards
> kytay
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by kytay <ka...@gmail.com>.
Hi TD

You are right, I did not include "\n" to delimit the string flushed. That's
the reason.

Is there a way for me to define the delimiter? Like SOH or ETX instead of
"\n"

Regards
kytay



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by Tathagata Das <ta...@gmail.com>.
When you are sending data using simple socket code to send messages, are
those messages "\n" delimited? If its not, then the receiver of
socketTextSTream, wont identify them as separate events, and keep buffering
them.

TD


On Sun, Jul 13, 2014 at 10:49 PM, kytay <ka...@gmail.com> wrote:

> Hi Tobias
>
> I have been using "local[4]" to test.
> My problem is likely caused by the tcp host server that I am trying the
> emulate. I was trying to emulate the tcp host to send out messages.
> (although I am not sure at the moment :D)
>
> First way I tried was to use a tcp tool called, Hercules.
>
> Second way was to write a simple socket code to send message at interval.
> Like the one shown in #2 of my first post. I suspect the reason why it
> don't
> work is due the messages are not "flush" so no message was received on
> Spark
> Streaming.
>
> I think I will need to do more testing to understand the behavior. I am
> currently not sure why "nc -lk" is working, and not the other tools or
> codes
> I am testing with.
>
> Regards.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by kytay <ka...@gmail.com>.
Hi Tobias

I have been using "local[4]" to test.
My problem is likely caused by the tcp host server that I am trying the
emulate. I was trying to emulate the tcp host to send out messages.
(although I am not sure at the moment :D)

First way I tried was to use a tcp tool called, Hercules.

Second way was to write a simple socket code to send message at interval.
Like the one shown in #2 of my first post. I suspect the reason why it don't
work is due the messages are not "flush" so no message was received on Spark
Streaming.

I think I will need to do more testing to understand the behavior. I am
currently not sure why "nc -lk" is working, and not the other tools or codes
I am testing with.

Regards.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

I experienced exactly the same problems when using SparkContext with
"local[1]" master specification, because in that case one thread is used
for receiving data, the others for processing. As there is only one thread
running, no processing will take place. Once you shut down the connection,
the receiver thread will be used for processing.

Any chance you run into the same issue?

Tobias



On Mon, Jul 14, 2014 at 11:45 AM, kytay <ka...@gmail.com> wrote:

> Hi Akhil Das
>
> Thanks.
>
> I tried the codes. and it works.
>
> There's a problem with my socket codes that is not flushing the content
> out,
> and for the test tool, Hercules, I have to close the socket connection to
> "flush" the content out.
>
> I am going to troubleshoot why nc works, and the codes and test tool don't.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by kytay <ka...@gmail.com>.
Hi Akhil Das

Thanks.

I tried the codes. and it works.

There's a problem with my socket codes that is not flushing the content out,
and for the test tool, Hercules, I have to close the socket connection to
"flush" the content out.

I am going to troubleshoot why nc works, and the codes and test tool don't.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Can you try this piece of code?

    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"
);
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(1000));

    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
            args[0], Integer.parseInt(args[1]), StorageLevels.
MEMORY_AND_DISK_SER);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
String>()  {
      @Override
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();


Taken from
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java



Thanks
Best Regards


On Fri, Jul 11, 2014 at 9:58 AM, kytay <ka...@gmail.com> wrote:

> I think I should be seeing any line of text that I have typed in the nc
> command.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by kytay <ka...@gmail.com>.
I think I should be seeing any line of text that I have typed in the nc
command.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by kytay <ka...@gmail.com>.
Hi Akhil Das

I have tried the 
nc -lk 9999 
command too.

I was hoping the "System.out.println("Print text:" + arg0);" is printed when
a stream is processed when lines.flatMap(...) is called.

But from my test with "nc -lk 9999", nothing is printed on the console at
all.

==

To test out whether the "nc" tool is working, I have also test the "nc" tool
with the Hercules TCP client test tool, it works fine.

So now the question goes back to why

JavaDStream<String> words =lines.flatMap(
		new FlatMapFunction<String, String>() {
			@Override
			public Iterable<String> call(String arg0) throws Exception {
				
				System.out.println("Print text:" + arg0);
				return Arrays.asList(arg0.split(" "));
			}
		});

is not printing the text I am sending through "nc -lk 9999".

===

Is there any other way to test if socketTextStream(...) is working?

Regards.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Sorry, the command is

nc -lk 12345

Thanks
Best Regards


On Fri, Jul 11, 2014 at 6:46 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> You simply use the *nc* command to do this. like:
>
> nc -p 12345
>
> will open the 12345 port and from the terminal you can provide whatever
> input you require for your StreamingCode.
>
> Thanks
> Best Regards
>
>
> On Fri, Jul 11, 2014 at 2:41 AM, kytay <ka...@gmail.com> wrote:
>
>> Hi
>>
>> I am learning spark streaming, and is trying out the JavaNetworkCount
>> example.
>>
>>
>> #1 - This is the code I wrote
>> JavaStreamingContext sctx = new JavaStreamingContext("local", appName, new
>> Duration(5000));
>> JavaReceiverInputDStream<String> lines =
>> sctx.socketTextStream("127.0.0.1",
>> 9999);
>>
>> JavaDStream<String> words =lines.flatMap(
>>                 new FlatMapFunction<String, String>() {
>>                         @Override
>>                         public Iterable<String> call(String arg0) throws
>> Exception {
>>
>>                                 System.out.println("Print text:" + arg0);
>>                                 return Arrays.asList(arg0.split(" "));
>>                         }
>>                 });
>>
>> #2  - This is the socketCode I am using
>> import java.io.BufferedReader;
>> import java.io.DataOutputStream;
>> import java.io.InputStreamReader;
>> import java.net.ServerSocket;
>> import java.net.Socket;
>>
>> public class TestTcpServer {
>>
>>         public static void main(String argv[]) throws Exception
>>         {
>>                 String clientSentence;
>>                 String capitalizedSentence;
>>                 ServerSocket welcomeSocket = new ServerSocket(9999);
>>
>>                 int i = 0;
>>
>>                 while(true)
>>                 {
>>                         Socket connectionSocket = welcomeSocket.accept();
>>                         BufferedReader inFromClient = new BufferedReader(
>>                                         new
>> InputStreamReader(connectionSocket.getInputStream())
>>                                         );
>>                         DataOutputStream outToClient = new
>> DataOutputStream(connectionSocket.getOutputStream());
>>
>>                         while(true)
>>                         {
>>                                 String sendingStr = "Sending... data... "
>> + i;
>>                                 outToClient.writeBytes(sendingStr);
>>                                 System.out.println(sendingStr);
>>                                 i++;
>>                                 Thread.sleep(3000);
>>                         }
>>                 }
>>         }
>> }
>>
>> What I am trying to do is to get the JavaNetworkCount in #1 to start
>> printing all the text I am receiving. But so far I failed to achieve that.
>>
>> I have been using  Hercules Setup
>> <http://www.hw-group.com/products/hercules/details_en.html>   to
>> simulate as
>> a TCP server, as well as a simple serversocket code in #2...
>> But I am not seeing any text being printed on the console.
>>
>> Is public Iterable<String> call(String arg0) throws Exception being called
>> every 5 secs?
>>
>> The console log is in  http://pastebin.com/THzdzGhg
>> <http://pastebin.com/THzdzGhg>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Re: Streaming. Cannot get socketTextStream to receive anything.

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
You simply use the *nc* command to do this. like:

nc -p 12345

will open the 12345 port and from the terminal you can provide whatever
input you require for your StreamingCode.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 2:41 AM, kytay <ka...@gmail.com> wrote:

> Hi
>
> I am learning spark streaming, and is trying out the JavaNetworkCount
> example.
>
>
> #1 - This is the code I wrote
> JavaStreamingContext sctx = new JavaStreamingContext("local", appName, new
> Duration(5000));
> JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
> 9999);
>
> JavaDStream<String> words =lines.flatMap(
>                 new FlatMapFunction<String, String>() {
>                         @Override
>                         public Iterable<String> call(String arg0) throws
> Exception {
>
>                                 System.out.println("Print text:" + arg0);
>                                 return Arrays.asList(arg0.split(" "));
>                         }
>                 });
>
> #2  - This is the socketCode I am using
> import java.io.BufferedReader;
> import java.io.DataOutputStream;
> import java.io.InputStreamReader;
> import java.net.ServerSocket;
> import java.net.Socket;
>
> public class TestTcpServer {
>
>         public static void main(String argv[]) throws Exception
>         {
>                 String clientSentence;
>                 String capitalizedSentence;
>                 ServerSocket welcomeSocket = new ServerSocket(9999);
>
>                 int i = 0;
>
>                 while(true)
>                 {
>                         Socket connectionSocket = welcomeSocket.accept();
>                         BufferedReader inFromClient = new BufferedReader(
>                                         new
> InputStreamReader(connectionSocket.getInputStream())
>                                         );
>                         DataOutputStream outToClient = new
> DataOutputStream(connectionSocket.getOutputStream());
>
>                         while(true)
>                         {
>                                 String sendingStr = "Sending... data... "
> + i;
>                                 outToClient.writeBytes(sendingStr);
>                                 System.out.println(sendingStr);
>                                 i++;
>                                 Thread.sleep(3000);
>                         }
>                 }
>         }
> }
>
> What I am trying to do is to get the JavaNetworkCount in #1 to start
> printing all the text I am receiving. But so far I failed to achieve that.
>
> I have been using  Hercules Setup
> <http://www.hw-group.com/products/hercules/details_en.html>   to simulate
> as
> a TCP server, as well as a simple serversocket code in #2...
> But I am not seeing any text being printed on the console.
>
> Is public Iterable<String> call(String arg0) throws Exception being called
> every 5 secs?
>
> The console log is in  http://pastebin.com/THzdzGhg
> <http://pastebin.com/THzdzGhg>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>