You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@livy.apache.org by kant kodali <ka...@gmail.com> on 2018/05/20 09:36:47 UTC

How to spawn multiple Dstream jobs using jobs?

Hi All,

I feel like I am not able to spawn multiple dstream jobs using Livy however
I can spawn multiple structured streaming jobs.Can anyone tell me if
spawning multiple dstream jobs with KafkaDirectStream is even possible?
currently I am able to spawn one dstream job and I am trying to spawn
another one but it throws an error because the first one is running. can
anyone shine some light here?

Thanks!

Re: How to spawn multiple Dstream jobs using jobs?

Posted by kant kodali <ka...@gmail.com>.
Here is some code

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.*;

public class DStreamExample implements Job<String> {

    private String topic;

    public DStreamExample(String topic) {
        this.topic = topic;
    }
    @Override
    public String call(JobContext jobContext) throws Exception {
        jobContext.createStreamingContext(1000);
        JavaStreamingContext javaStreamingContext = jobContext.streamingctx();
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", UUID.randomUUID().toString());
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList(topic);

        final JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        javaStreamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String,
String>Subscribe(topics, kafkaParams)
                );

        stream.foreachRDD(new
VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>>
consumerRecordJavaRDD) throws Exception {
                consumerRecordJavaRDD.foreach(new
VoidFunction<ConsumerRecord<String, String>>() {
                    @Override
                    public void call(ConsumerRecord<String, String>
stringStringConsumerRecord) throws Exception {
                        System.out.println(stringStringConsumerRecord.value());

System.out.println("************************************");
                    }
                });
            }
        });
        javaStreamingContext.start();
        return "Successful";
    }
}


And here is my main function


public static void main(String[] args) throws Exception {
    MyObj gateway = new MyObj();
    String jarPath = config.getString("livy.jar");
    gateway.livy.addJar(new File(jarPath).toURI()).get();
    System.out.println("hello 0000");
    System.out.println(gateway.livy.submit(new
DStreamExample("dstream1")).get());
    System.out.println("hello 1111");
    System.out.println(gateway.livy.submit(new
DStreamExample("dstream2")).get());
    System.out.println("hello 2222");
}

*My dstream1 get submitted fine and it works but when it hits dstream2
I get the following error
*
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: java.lang.IllegalStateException: Streaming
context is not null.org.apache.livy.rsc.Utils.checkState(Utils.java:46)org.apache.livy.rsc.driver.JobContextImpl.createStreamingContext(JobContextImpl.java:115)com.peernova.jobengine.jobs.DStreamExample.call(DStreamExample.java:26)com.peernova.jobengine.jobs.DStreamExample.call(DStreamExample.java:17)org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:40)org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:57)

*I want to be able to spawn multiple dstream jobs this way and each
job can have different batch interval. *

*any workaround for this?*


Thanks!


On Sun, May 20, 2018 at 2:36 AM, kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I feel like I am not able to spawn multiple dstream jobs using Livy
> however I can spawn multiple structured streaming jobs.Can anyone tell me
> if spawning multiple dstream jobs with KafkaDirectStream is even possible?
> currently I am able to spawn one dstream job and I am trying to spawn
> another one but it throws an error because the first one is running. can
> anyone shine some light here?
>
> Thanks!
>