You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@livy.apache.org by kant kodali <ka...@gmail.com> on 2018/05/20 09:36:47 UTC
How to spawn multiple Dstream jobs using jobs?
Hi All,
I feel like I am not able to spawn multiple dstream jobs using Livy however
I can spawn multiple structured streaming jobs.Can anyone tell me if
spawning multiple dstream jobs with KafkaDirectStream is even possible?
currently I am able to spawn one dstream job and I am trying to spawn
another one but it throws an error because the first one is running. can
anyone shine some light here?
Thanks!
Re: How to spawn multiple Dstream jobs using jobs?
Posted by kant kodali <ka...@gmail.com>.
Here is some code
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.*;
public class DStreamExample implements Job<String> {
private String topic;
public DStreamExample(String topic) {
this.topic = topic;
}
@Override
public String call(JobContext jobContext) throws Exception {
jobContext.createStreamingContext(1000);
JavaStreamingContext javaStreamingContext = jobContext.streamingctx();
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", UUID.randomUUID().toString());
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(topic);
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,
String>Subscribe(topics, kafkaParams)
);
stream.foreachRDD(new
VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>>
consumerRecordJavaRDD) throws Exception {
consumerRecordJavaRDD.foreach(new
VoidFunction<ConsumerRecord<String, String>>() {
@Override
public void call(ConsumerRecord<String, String>
stringStringConsumerRecord) throws Exception {
System.out.println(stringStringConsumerRecord.value());
System.out.println("************************************");
}
});
}
});
javaStreamingContext.start();
return "Successful";
}
}
And here is my main function
public static void main(String[] args) throws Exception {
MyObj gateway = new MyObj();
String jarPath = config.getString("livy.jar");
gateway.livy.addJar(new File(jarPath).toURI()).get();
System.out.println("hello 0000");
System.out.println(gateway.livy.submit(new
DStreamExample("dstream1")).get());
System.out.println("hello 1111");
System.out.println(gateway.livy.submit(new
DStreamExample("dstream2")).get());
System.out.println("hello 2222");
}
*My dstream1 get submitted fine and it works but when it hits dstream2
I get the following error
*
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: java.lang.IllegalStateException: Streaming
context is not null.org.apache.livy.rsc.Utils.checkState(Utils.java:46)org.apache.livy.rsc.driver.JobContextImpl.createStreamingContext(JobContextImpl.java:115)com.peernova.jobengine.jobs.DStreamExample.call(DStreamExample.java:26)com.peernova.jobengine.jobs.DStreamExample.call(DStreamExample.java:17)org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:40)org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:57)
*I want to be able to spawn multiple dstream jobs this way and each
job can have different batch interval. *
*any workaround for this?*
Thanks!
On Sun, May 20, 2018 at 2:36 AM, kant kodali <ka...@gmail.com> wrote:
> Hi All,
>
> I feel like I am not able to spawn multiple dstream jobs using Livy
> however I can spawn multiple structured streaming jobs.Can anyone tell me
> if spawning multiple dstream jobs with KafkaDirectStream is even possible?
> currently I am able to spawn one dstream job and I am trying to spawn
> another one but it throws an error because the first one is running. can
> anyone shine some light here?
>
> Thanks!
>