You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kytay <ka...@gmail.com> on 2014/07/10 11:47:22 UTC

Getting Persistent Connection using socketStream?

Hi

I am trying out a simple piece of code by writing my own JavaNetworkCount
app to test out Spark Streaming

So here is the 2 set of the codes.


// #1
JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
9999);
        

// #2
JavaReceiverInputDStream<String> lines = sctx.socketStream(
        		"127.0.0.1",
        		9999,
        		new Function<InputStream, Iterable&lt;String>>() {

					@Override
					public Iterable<String> call(InputStream arg0) throws Exception {
						// TODO Auto-generated method stub
						if(arg0 != null)
							System.out.println("CALL is called...");
						
						BufferedReader reader = new BufferedReader(new
InputStreamReader(arg0));
						ArrayList<String> list = new ArrayList<String>();
						while(reader.ready())
						{
							String linetext = reader.readLine();
							System.out.println(linetext);
							list.add(linetext);
						}
						
						if(list.size() > 0)
							System.out.println("ArrayList is not empty.");
							
						return list;
					}
        			
				},
				StorageLevel.MEMORY_AND_DISK_SER_2()
        		);

I am writing the #2 to test out some other issues that I am facing, where
the text stream from the TCP host is not received, but this is not my first
concern.

What I am concern about is. 

Using .socketTextStream(), the code manage to keep a persistent connection
to the TCP host, while for #2 code using .socketStream(), I am unable to
maintain a persistent connection.

The following is the log printed when I run #2

14/07/10 01:55:42 INFO ReceiverSupervisorImpl: Receiver started again
14/07/10 01:55:43 INFO SocketReceiver: Connected to 127.0.0.1:9999
CALL is called...
14/07/10 01:55:43 INFO SocketReceiver: Stopped receiving
14/07/10 01:55:43 WARN ReceiverSupervisorImpl: Restarting receiver with
delay 2000 ms: Retrying connecting to 127.0.0.1:9999
14/07/10 01:55:43 INFO SocketReceiver: Closed socket to 127.0.0.1:9999
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopping receiver with
message: Restarting receiver with delay 2000ms: Retrying connecting to
127.0.0.1:9999: 
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Called receiver onStop
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Deregistering receiver 0
14/07/10 01:55:43 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Restarting receiver with delay 2000ms: Retrying connecting to 127.0.0.1:9999
14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopped receiver 0
14/07/10 01:55:45 INFO ReceiverTracker: Stream 0 received 0 blocks
14/07/10 01:55:45 INFO JobScheduler: Added jobs for time 1404984705000 ms
14/07/10 01:55:45 INFO ReceiverSupervisorImpl: Starting receiver again

<iframe src="http://pastebin.com/embed_iframe.php?i=KVWEC1kU"
style="border:none;width:100%"></iframe>

I am very new to clustered computing, hadoop, spark, even streaming. So I
may not get the entire concept right.

So may I clarify, is there something in my #2 codes? am i able to achieve
the same thing as what #1 is trying to do?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Getting Persistent Connection using socketStream?

Posted by Tathagata Das <ta...@gmail.com>.
Right this uses NextIterator, which is elsewhere in the repo. It just makes
it cleaner to implement a custom iterator. But i guess you got the high
level point, so its okay.

TD


On Thu, Jul 10, 2014 at 7:21 PM, kytay <ka...@gmail.com> wrote:

> Hi TD
>
> Thanks.
>
> I have problem understanding the codes in github,  Object
> SocketReceiver.byteToLines(...)
> <
> https://github.com/apache/spark/blob/095b5182536a43e2ae738be93294ee5215d86581/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
> >
>
> private[streaming]
> object SocketReceiver  {
>
>   /**
>    * This methods translates the data from an inputstream (say, from a
> socket)
>    * to '\n' delimited strings and returns an iterator to access the
> strings.
>    */
>   def bytesToLines(inputStream: InputStream): Iterator[String] = {
>     val dataInputStream = new BufferedReader(new
> InputStreamReader(inputStream, "UTF-8"))
>     new NextIterator[String] {
>       protected override def getNext() = {
>         val nextValue = dataInputStream.readLine()
>         if (nextValue == null) {
>           finished = true
>         }
>         nextValue
>       }
>
>       protected override def close() {
>         dataInputStream.close()
>       }
>     }
>   }
>
> Sorry will need some time to digest this. I do not know scala at the
> moment.
> But I understand what you mean about the implementation. Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285p9380.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Getting Persistent Connection using socketStream?

Posted by kytay <ka...@gmail.com>.
Hi TD

Thanks. 

I have problem understanding the codes in github,  Object
SocketReceiver.byteToLines(...)
<https://github.com/apache/spark/blob/095b5182536a43e2ae738be93294ee5215d86581/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala>  

private[streaming]
object SocketReceiver  {

  /**
   * This methods translates the data from an inputstream (say, from a
socket)
   * to '\n' delimited strings and returns an iterator to access the
strings.
   */
  def bytesToLines(inputStream: InputStream): Iterator[String] = {
    val dataInputStream = new BufferedReader(new
InputStreamReader(inputStream, "UTF-8"))
    new NextIterator[String] {
      protected override def getNext() = {
        val nextValue = dataInputStream.readLine()
        if (nextValue == null) {
          finished = true
        }
        nextValue
      }

      protected override def close() {
        dataInputStream.close()
      }
    }
  }

Sorry will need some time to digest this. I do not know scala at the moment.
But I understand what you mean about the implementation. Thanks.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285p9380.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Getting Persistent Connection using socketStream?

Posted by Tathagata Das <ta...@gmail.com>.
The implementation of the input-stream-to-iterator function in #2 is
incorrect. The function should be such that, when the hasNext is called on
the iterator, it should try to read from the buffered reader. If an object
(that is, line) can be read, then return it, otherwise block and wait for
data to be available. See the function used for socketTextStream
<https://github.com/apache/spark/blob/095b5182536a43e2ae738be93294ee5215d86581/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala>
.

Instead what your code is doing is try to read as much data from the input
stream in one shot (which may have very little data), and put it into the
list, and then return the iterator of the list. As soon as the list is
consumed (that is iterator.hashNext = false), the socket receiver assumes
that there is no more data and thus stops on its own.

Note to self - document this better.

TD


On Thu, Jul 10, 2014 at 2:47 AM, kytay <ka...@gmail.com> wrote:

> Hi
>
> I am trying out a simple piece of code by writing my own JavaNetworkCount
> app to test out Spark Streaming
>
> So here is the 2 set of the codes.
>
>
> // #1
> JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
> 9999);
>
>
> // #2
> JavaReceiverInputDStream<String> lines = sctx.socketStream(
>                         "127.0.0.1",
>                         9999,
>                         new Function<InputStream, Iterable&lt;String>>() {
>
>                                         @Override
>                                         public Iterable<String>
> call(InputStream arg0) throws Exception {
>                                                 // TODO Auto-generated
> method stub
>                                                 if(arg0 != null)
>
> System.out.println("CALL is called...");
>
>                                                 BufferedReader reader =
> new BufferedReader(new
> InputStreamReader(arg0));
>                                                 ArrayList<String> list =
> new ArrayList<String>();
>                                                 while(reader.ready())
>                                                 {
>                                                         String linetext =
> reader.readLine();
>
> System.out.println(linetext);
>                                                         list.add(linetext);
>                                                 }
>
>                                                 if(list.size() > 0)
>
> System.out.println("ArrayList is not empty.");
>
>                                                 return list;
>                                         }
>
>                                 },
>                                 StorageLevel.MEMORY_AND_DISK_SER_2()
>                         );
>
> I am writing the #2 to test out some other issues that I am facing, where
> the text stream from the TCP host is not received, but this is not my first
> concern.
>
> What I am concern about is.
>
> Using .socketTextStream(), the code manage to keep a persistent connection
> to the TCP host, while for #2 code using .socketStream(), I am unable to
> maintain a persistent connection.
>
> The following is the log printed when I run #2
>
> 14/07/10 01:55:42 INFO ReceiverSupervisorImpl: Receiver started again
> 14/07/10 01:55:43 INFO SocketReceiver: Connected to 127.0.0.1:9999
> CALL is called...
> 14/07/10 01:55:43 INFO SocketReceiver: Stopped receiving
> 14/07/10 01:55:43 WARN ReceiverSupervisorImpl: Restarting receiver with
> delay 2000 ms: Retrying connecting to 127.0.0.1:9999
> 14/07/10 01:55:43 INFO SocketReceiver: Closed socket to 127.0.0.1:9999
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopping receiver with
> message: Restarting receiver with delay 2000ms: Retrying connecting to
> 127.0.0.1:9999:
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Called receiver onStop
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Deregistering receiver 0
> 14/07/10 01:55:43 ERROR ReceiverTracker: Deregistered receiver for stream
> 0:
> Restarting receiver with delay 2000ms: Retrying connecting to
> 127.0.0.1:9999
> 14/07/10 01:55:43 INFO ReceiverSupervisorImpl: Stopped receiver 0
> 14/07/10 01:55:45 INFO ReceiverTracker: Stream 0 received 0 blocks
> 14/07/10 01:55:45 INFO JobScheduler: Added jobs for time 1404984705000 ms
> 14/07/10 01:55:45 INFO ReceiverSupervisorImpl: Starting receiver again
>
> <iframe src="http://pastebin.com/embed_iframe.php?i=KVWEC1kU"
> style="border:none;width:100%"></iframe>
>
> I am very new to clustered computing, hadoop, spark, even streaming. So I
> may not get the entire concept right.
>
> So may I clarify, is there something in my #2 codes? am i able to achieve
> the same thing as what #1 is trying to do?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-Persistent-Connection-using-socketStream-tp9285.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>