You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yu Wei <yu...@hotmail.com> on 2016/07/06 11:35:52 UTC

It seemed JavaDStream.print() did not work when launching via yarn on a single node

Hi guys,


It seemed that when launching application via yarn on single node, JavaDStream.print() did not work. However, occasionally it worked.

If launch the same application in local mode, it always worked.


The code is as below,

SparkConf conf = new SparkConf().setAppName("Monitor&Control");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> inputDS = MQTTUtils.createStream(jssc, "tcp://114.55.145.185:1883", "Control");
inputDS.print();
jssc.start();
jssc.awaitTermination();


Command for launching via yarn, (did not work)
spark-submit --master yarn --deploy-mode cluster --driver-memory 4g --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar

Command for launching via local mode (works)
spark-submit --master local[4] --driver-memory 4g --executor-memory 2g --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar


Any thoughts about the problem?


Thanks,

Jared


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Yu Wei <yu...@hotmail.com>.
Actually Time was printed out always.

Is there any better method to debug the problem? I want to update spark/mqtt code and rebuild again to debug further.


Thanks,

Jared


________________________________
From: Saisai Shao <sa...@gmail.com>
Sent: Wednesday, July 6, 2016 9:24 PM
To: Yu Wei
Cc: Sean Owen; Rabin Banerjee; user@spark.apache.org
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

DStream.print() will collect some of the data to driver and display, please see the implementation of DStream.print()

RDD.take() will collect some of the data to driver.

Normally the behavior should be consistent between cluster and local mode, please find out the root cause of this problem, like MQTT connection or something else.


def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println(s"Time: $time")
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

On Wed, Jul 6, 2016 at 9:17 PM, Yu Wei <yu...@hotmail.com>> wrote:

How about DStream.print().

Does it invoke collect before print on driver?

________________________________
From: Sean Owen <so...@cloudera.com>>
Sent: Wednesday, July 6, 2016 8:20:36 PM
To: Rabin Banerjee
Cc: Yu Wei; user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
<de...@gmail.com>> wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Saisai Shao <sa...@gmail.com>.
DStream.print() will collect some of the data to driver and display, please
see the implementation of DStream.print()

RDD.take() will collect some of the data to driver.

Normally the behavior should be consistent between cluster and local mode,
please find out the root cause of this problem, like MQTT connection or
something else.

def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println(s"Time: $time")
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc),
displayInnerRDDOps = false)
}


On Wed, Jul 6, 2016 at 9:17 PM, Yu Wei <yu...@hotmail.com> wrote:

> How about DStream.print().
>
> Does it invoke collect before print on driver?
> ------------------------------
> *From:* Sean Owen <so...@cloudera.com>
> *Sent:* Wednesday, July 6, 2016 8:20:36 PM
> *To:* Rabin Banerjee
> *Cc:* Yu Wei; user@spark.apache.org
> *Subject:* Re: It seemed JavaDStream.print() did not work when launching
> via yarn on a single node
>
> dstream.foreachRDD(_.collect.foreach(println))
>
> On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
> <de...@gmail.com> wrote:
> > Collect will help then . May be something like this,
> > foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item);
> }
> > })
> >
>

Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Yu Wei <yu...@hotmail.com>.
How about DStream.print().

Does it invoke collect before print on driver?

________________________________
From: Sean Owen <so...@cloudera.com>
Sent: Wednesday, July 6, 2016 8:20:36 PM
To: Rabin Banerjee
Cc: Yu Wei; user@spark.apache.org
Subject: Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
<de...@gmail.com> wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>

Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Sean Owen <so...@cloudera.com>.
dstream.foreachRDD(_.collect.foreach(println))

On Wed, Jul 6, 2016 at 1:19 PM, Rabin Banerjee
<de...@gmail.com> wrote:
> Collect will help then . May be something like this,
> foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
> })
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Rabin Banerjee <de...@gmail.com>.
Collect will help then . May be something like this,
foreachRDD( rdd => { for(item <- rdd.collect().toArray) { println(item); }
})

On Wed, Jul 6, 2016 at 5:46 PM, Sean Owen <so...@cloudera.com> wrote:

> That's still causing the element to be printed on the remote
> executors, not the driver. You'd have to collect the RDD and then
> println, really. Also see DStream.print()
>
> On Wed, Jul 6, 2016 at 1:07 PM, Rabin Banerjee
> <de...@gmail.com> wrote:
> > It's not working because , you haven't collected the data.
> >
> > Try something like
> >
> > DStream.forEachRDD((rdd)=> {rdd.foreach(println)})
> >
> > Thanks,
> > Rabin
> >
> >
> > On Wed, Jul 6, 2016 at 5:05 PM, Yu Wei <yu...@hotmail.com> wrote:
> >>
> >> Hi guys,
> >>
> >>
> >> It seemed that when launching application via yarn on single node,
> >> JavaDStream.print() did not work. However, occasionally it worked.
> >>
> >> If launch the same application in local mode, it always worked.
> >>
> >>
> >> The code is as below,
> >>
> >> SparkConf conf = new SparkConf().setAppName("Monitor&Control");
> >> JavaStreamingContext jssc = new JavaStreamingContext(conf,
> >> Durations.seconds(1));
> >> JavaReceiverInputDStream<String> inputDS = MQTTUtils.createStream(jssc,
> >> "tcp://114.55.145.185:1883", "Control");
> >> inputDS.print();
> >> jssc.start();
> >> jssc.awaitTermination();
> >>
> >>
> >> Command for launching via yarn, (did not work)
> >> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
> >> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
> >>
> >> Command for launching via local mode (works)
> >> spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
> >> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
> >>
> >>
> >> Any thoughts about the problem?
> >>
> >>
> >> Thanks,
> >>
> >> Jared
> >>
> >
>

Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Sean Owen <so...@cloudera.com>.
That's still causing the element to be printed on the remote
executors, not the driver. You'd have to collect the RDD and then
println, really. Also see DStream.print()

On Wed, Jul 6, 2016 at 1:07 PM, Rabin Banerjee
<de...@gmail.com> wrote:
> It's not working because , you haven't collected the data.
>
> Try something like
>
> DStream.forEachRDD((rdd)=> {rdd.foreach(println)})
>
> Thanks,
> Rabin
>
>
> On Wed, Jul 6, 2016 at 5:05 PM, Yu Wei <yu...@hotmail.com> wrote:
>>
>> Hi guys,
>>
>>
>> It seemed that when launching application via yarn on single node,
>> JavaDStream.print() did not work. However, occasionally it worked.
>>
>> If launch the same application in local mode, it always worked.
>>
>>
>> The code is as below,
>>
>> SparkConf conf = new SparkConf().setAppName("Monitor&Control");
>> JavaStreamingContext jssc = new JavaStreamingContext(conf,
>> Durations.seconds(1));
>> JavaReceiverInputDStream<String> inputDS = MQTTUtils.createStream(jssc,
>> "tcp://114.55.145.185:1883", "Control");
>> inputDS.print();
>> jssc.start();
>> jssc.awaitTermination();
>>
>>
>> Command for launching via yarn, (did not work)
>> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
>> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
>>
>> Command for launching via local mode (works)
>> spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
>> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
>>
>>
>> Any thoughts about the problem?
>>
>>
>> Thanks,
>>
>> Jared
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: It seemed JavaDStream.print() did not work when launching via yarn on a single node

Posted by Rabin Banerjee <de...@gmail.com>.
It's not working because , you haven't collected the data.

Try something like

DStream.forEachRDD((rdd)=> {rdd.foreach(println)})

Thanks,
Rabin


On Wed, Jul 6, 2016 at 5:05 PM, Yu Wei <yu...@hotmail.com> wrote:

> Hi guys,
>
>
> It seemed that when launching application via yarn on single node,
> JavaDStream.print() did not work. However, occasionally it worked.
>
> If launch the same application in local mode, it always worked.
>
>
> The code is as below,
> SparkConf conf = new SparkConf().setAppName("Monitor&Control");
> JavaStreamingContext jssc = new JavaStreamingContext(conf,
> Durations.seconds(1));
> JavaReceiverInputDStream<String> inputDS = MQTTUtils.createStream(jssc,
> "tcp://114.55.145.185:1883", "Control");
> inputDS.print();
> jssc.start();
> jssc.awaitTermination();
>
>
> Command for launching via yarn, (did not work)
> spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
> --executor-memory 2g target/CollAna-1.0-SNAPSHOT.jar
>
> Command for launching via local mode (works)
> spark-submit --master local[4] --driver-memory 4g --executor-memory 2g
> --num-executors 4 target/CollAna-1.0-SNAPSHOT.jar
>
>
> Any thoughts about the problem?
>
>
> Thanks,
> Jared
>
>