You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@livy.apache.org by kant kodali <ka...@gmail.com> on 2017/12/06 02:50:41 UTC

How to submit a streaming query that runs forever using Livy?

Hi All,

I have the following classes. And *I have all my questions in bold below*
but I will also add more context here. I just want to fire and forget a
streaming Job and
my streaming job just reads from kafka, applies some transformation and
writes back to kafka but for the sake of testing I am am writing the output
to console in the below code.

I want my streaming job to run forever but the problem is when I submit the
jar using livyClient and do .get() it will be stuck there forever. The goal
is to launch
as many streaming jobs as possible that in theory should run forever.

public class StreamingJob implements Job<Void> {

  public StreamingJob() {}

  @Override
  public Void call(JobContext ctx) throws Exception {
    Config config = ConfigFactory.load();
    SparkSession sparkSession = ctx.sparkSession();
    ctx.sc().getConf().setAppName("MyJob"); // *This app name is not
getting set when I go http://localhost:8998/sessions
<http://localhost:8998/sessions>*



*                                           // I can see my query but
Appid is always set to null    System.out.println("READING STREAM");*

    Dataset<Row> df = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers",
config.getString("kafka.consumer.settings.bootstrapServers"))
        .load();

    df.printSchema(); // *Where does these print statements go ?*


    Dataset<Row> resultSet = df.map(// some code);
    StreamingQuery streamingQuery = resultSet.writeStream()
        .format("console")
        .start();
    streamingQuery.awaitTermination(); // *This thing blocks forever
and I don't want to set a timeout. *


*                                      // so what should I do to fire
and forget a streaming job ?     return null;*
  }
}



public class FireStreamingJob {
  private Config config = ConfigFactory.load();
  private LivyClient livy;

  public JobEngine() throws IOException, URISyntaxException {
    StringBuilder sb = new StringBuilder("http://");
    sb.append(this.config.getString("livy.host"));
    sb.append(":");
    sb.append(this.config.getString("livy.port"));
    String livyUrl = sb.toString();
    this.livy = new LivyClientBuilder()
        .setURI(new URI(livyUrl))
        .build();
  }

  public static void main(String[] args) throws ExecutionException,
InterruptedException, IOException, URISyntaxException {

     JobEngine engine = new JobEngine();

     for(String jarpath: args) {

     livy.uploadJar(new File(jarPath)).get();

     try {
        livy.submit(new StreamingJob()).get(); // *This will block forever*

        System.out.println("SUBMITTED JAR!");  // *The control will
never get here so I can't submit another job.*
     } finally {
        livy.stop(true);
     }

    }

  }
}

Re: How to submit a streaming query that runs forever using Livy?

Posted by kant kodali <ka...@gmail.com>.
@Meisam Fathi

1. I am using Spark Standalone mode so no YARN anywhere. How do I set Appid
in Standalone mode?
2. All the streaming Jobs I had written so far I always do .start() and
.awaitTermination(). Are you saying .awaitTermination is not needed with
livy?
  When I saw the livy test
<https://github.com/apache/incubator-livy/blob/d4bd76f09690079c47364b3349f549e32db4d621/examples/src/main/scala/org/apache/livy/examples/WordCountApp.scala#L106>
It
does look like it is calling awaitTermination() but with a timeout in my
case there is no timeout because I want the query
  to run forever. The reason why it blocks forever because I a calling
.get() on a java Future which is a blocking call and it never returns
because
  I have awaitTermination(). so If I remove .get() and just have livy
.submit(new StreamingJob()) this seem to work fine and I dont need the
return value because I am returning void anyway.  so the question really
now is
  Do I need to cal awaitTermination() or not ? I can say for sure which
spark-submit if one has a streaming job with .start() alone it wont work
they must
  have awaitTermination().

Please let me know.

Thanks!





On Wed, Dec 6, 2017 at 9:03 AM, Meisam Fathi <me...@gmail.com> wrote:

> Please find some of the answers inlline.
>
>>     SparkSession sparkSession = ctx.sparkSession();
>>
>>     ctx.sc().getConf().setAppName("MyJob"); // *This app name is not getting set when I go http://localhost:8998/sessions <http://localhost:8998/sessions>*
>>
>>     By this time the spark session is already created. You should set
> the configs before starting SparkContext.
>
>
>>
>> *                                       // I can see my query but Appid is always set to null*
>>
>> AppId should be given to Livy by YARN (if you are running on YARN). It
> may take a while to get response if YARN is busy. If you are not getting an
> appID at all, then your application was not submitted correctly. You may
> want to check your cluster manager UI for more information
>
>
>>
>> *    System.out.println("READING STREAM");*
>>
>> This will be executed on the driver node. If you are running Spark in
> standalone mode or in client mode, the driver node is the same node that
> runs Spark. If you are running Spark in cluster mode, the driver is a
> random node on the cluster assigned by YARN.
>
>
>>
>>
>>     df.printSchema(); // *Where does these print statements go ?*
>>
>>
>>
> Same as above.
>
>
>> awaitTermination(); // *This thing blocks forever and I don't want to set a timeout. *
>>
>>
>> *                                      // so what should I do to fire and forget a streaming job ? *
>>
>> I believe you can call .start()
>
>>         livy.submit(new StreamingJob()).get(); // *This will block forever*
>>
>> Livy.submit.get returns a value only if the job succeeds. You may want to
> use
> onJobFailed(JobHandle<T> job, Throwable cause) as well to handle errors
> and get a better idea why the job is not returning.
>
>
>
>>         System.out.println("SUBMITTED JAR!");  // *The control will never get here so I can't submit another job.*\
>>
>> See above.
>
> Thanks,
> Meisam
>

Re: How to submit a streaming query that runs forever using Livy?

Posted by Meisam Fathi <me...@gmail.com>.
Please find some of the answers inlline.

>     SparkSession sparkSession = ctx.sparkSession();
>
>     ctx.sc().getConf().setAppName("MyJob"); // *This app name is not getting set when I go http://localhost:8998/sessions <http://localhost:8998/sessions>*
>
>     By this time the spark session is already created. You should set the
configs before starting SparkContext.


>
> *                                       // I can see my query but Appid is always set to null*
>
> AppId should be given to Livy by YARN (if you are running on YARN). It may
take a while to get response if YARN is busy. If you are not getting an
appID at all, then your application was not submitted correctly. You may
want to check your cluster manager UI for more information


>
> *    System.out.println("READING STREAM");*
>
> This will be executed on the driver node. If you are running Spark in
standalone mode or in client mode, the driver node is the same node that
runs Spark. If you are running Spark in cluster mode, the driver is a
random node on the cluster assigned by YARN.


>
>
>     df.printSchema(); // *Where does these print statements go ?*
>
>
>
Same as above.


> awaitTermination(); // *This thing blocks forever and I don't want to set a timeout. *
>
>
> *                                      // so what should I do to fire and forget a streaming job ? *
>
> I believe you can call .start()

>         livy.submit(new StreamingJob()).get(); // *This will block forever*
>
> Livy.submit.get returns a value only if the job succeeds. You may want to
use
onJobFailed(JobHandle<T> job, Throwable cause) as well to handle errors and
get a better idea why the job is not returning.



>         System.out.println("SUBMITTED JAR!");  // *The control will never get here so I can't submit another job.*\
>
> See above.

Thanks,
Meisam