You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/20 12:42:30 UTC
cxf git commit: Optional support for the paralellized collections in
the spark demo
Repository: cxf
Updated Branches:
refs/heads/master 0af65a4ab -> 629af817f
Optional support for the paralellized collections in the spark demo
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/629af817
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/629af817
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/629af817
Branch: refs/heads/master
Commit: 629af817f4762a12623e76fb7f7baefe98482719
Parents: 0af65a4
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 20 13:42:15 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 20 13:42:15 2016 +0100
----------------------------------------------------------------------
.../src/main/java/demo/jaxrs/server/Server.java | 9 ++++--
.../jaxrs/server/SparkStreamingListener.java | 4 +++
.../demo/jaxrs/server/SparkStreamingOutput.java | 3 ++
.../demo/jaxrs/server/StreamingService.java | 30 ++++++++++++++------
4 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
index 8a1092f..d2b3be9 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -25,18 +25,21 @@ import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
public class Server {
- protected Server() throws Exception {
+ protected Server(String args[]) throws Exception {
JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
sf.setResourceClasses(StreamingService.class);
+
+ String receiverType = args.length == 1 && args[0].equals("-receiverType=queue") ?
+ "queue" : "string";
sf.setResourceProvider(StreamingService.class,
- new SingletonResourceProvider(new StreamingService()));
+ new SingletonResourceProvider(new StreamingService(receiverType)));
sf.setAddress("http://localhost:9000/spark");
sf.create();
}
public static void main(String args[]) throws Exception {
- new Server();
+ new Server(args);
System.out.println("Server ready...");
Thread.sleep(60 * 60 * 1000);
System.out.println("Server exiting");
http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 8a891c2..6cb68e8 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -31,6 +31,7 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
public class SparkStreamingListener implements StreamingListener {
private SparkStreamingOutput streamOutput;
private boolean batchStarted;
+ private long batchStartAt;
public SparkStreamingListener(SparkStreamingOutput streamOutput) {
this.streamOutput = streamOutput;
@@ -38,12 +39,15 @@ public class SparkStreamingListener implements StreamingListener {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+ System.out.println("Batch processing time in millisecs: " + (System.currentTimeMillis() - batchStartAt));
+
streamOutput.setSparkBatchCompleted();
}
@Override
public synchronized void onBatchStarted(StreamingListenerBatchStarted event) {
batchStarted = true;
+ batchStartAt = System.currentTimeMillis();
notify();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 7324806..300a0f7 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -36,8 +36,10 @@ public class SparkStreamingOutput implements StreamingOutput {
private JavaStreamingContext jssc;
private volatile boolean sparkBatchCompleted;
private volatile boolean outputWriteDone;
+ private long startAt;
public SparkStreamingOutput(JavaStreamingContext jssc) {
this.jssc = jssc;
+ this.startAt = System.currentTimeMillis();
}
@Override
@@ -57,6 +59,7 @@ public class SparkStreamingOutput implements StreamingOutput {
jssc.stop(false);
jssc.close();
+ System.out.println("Total processing time in millisecs: " + (System.currentTimeMillis() - startAt));
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/629af817/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 2ce1531..a35b68a 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
@@ -52,13 +53,12 @@ import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.Receiver;
import scala.Tuple2;
@@ -74,7 +74,11 @@ public class StreamingService {
}
private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
- public StreamingService() {
+
+ private String receiverType;
+
+ public StreamingService(String receiverType) {
+ this.receiverType = receiverType;
}
@POST
@@ -97,7 +101,7 @@ public class StreamingService {
TikaContentExtractor tika = new TikaContentExtractor();
TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
mediaType);
- processStream(async, new StringListReceiver(getStringsFromString(tikaContent.getContent())));
+ processStream(async, getStringsFromString(tikaContent.getContent()));
}
@POST
@@ -105,10 +109,10 @@ public class StreamingService {
@Consumes("text/plain")
@Produces("text/plain")
public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
- processStream(async, new StringListReceiver(getStringsFromInputStream(is)));
+ processStream(async, getStringsFromInputStream(is));
}
- private void processStream(AsyncResponse async, Receiver<String> receiver) {
+ private void processStream(AsyncResponse async, List<String> inputStrings) {
try {
SparkConf sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("JAX-RS Spark Connect " + getRandomId());
@@ -118,7 +122,17 @@ public class StreamingService {
SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut);
jssc.addStreamingListener(sparkListener);
- JavaReceiverInputDStream<String> receiverStream = jssc.receiverStream(receiver);
+ JavaDStream<String> receiverStream = null;
+ if ("queue".equals(receiverType)) {
+ Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
+ for (int i = 0; i < 30; i++) {
+ rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
+ }
+ receiverStream = jssc.queueStream(rddQueue);
+ } else {
+ receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings));
+ }
+
JavaPairDStream<String, Integer> wordCounts = createOutputDStream(receiverStream);
wordCounts.foreachRDD(new OutputFunction(streamOut));
jssc.start();
@@ -135,7 +149,7 @@ public class StreamingService {
}
private static JavaPairDStream<String, Integer> createOutputDStream(
- JavaReceiverInputDStream<String> receiverStream) {
+ JavaDStream<String> receiverStream) {
final JavaDStream<String> words =
receiverStream.flatMap(x -> splitInputString(x));