You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/10 14:41:20 UTC
[12/37] cxf git commit: [CXF-6618] Minimizing the dep on the Spark
api in StreamingOutputImpl
[CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/46438c48
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/46438c48
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/46438c48
Branch: refs/heads/master-jaxrs-2.1
Commit: 46438c487115889c4cadd86f70fd4109a0daa2b8
Parents: e2c866a
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 6 15:49:17 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 6 15:49:17 2016 +0100
----------------------------------------------------------------------
.../jaxrs/server/SparkStreamingListener.java | 8 +--
.../demo/jaxrs/server/SparkStreamingOutput.java | 67 +++++++-------------
.../demo/jaxrs/server/StreamingService.java | 27 ++++++--
3 files changed, 49 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 1881857..3fd7284 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -11,14 +11,15 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
public class SparkStreamingListener implements StreamingListener {
- private SparkStreamingOutput sparkStreamingOutput;
+ private SparkStreamingOutput streamOutput;
- public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) {
- this.sparkStreamingOutput = sparkStreamingOutput;
+ public SparkStreamingListener(SparkStreamingOutput streamOutput) {
+ this.streamOutput = streamOutput;
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+ streamOutput.setSparkBatchCompleted();
}
@Override
@@ -31,7 +32,6 @@ public class SparkStreamingListener implements StreamingListener {
@Override
public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
- sparkStreamingOutput.setOperationCompleted();
}
@Override
http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index e28bb5a..43166fe 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -20,72 +20,49 @@ package demo.jaxrs.server;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.cxf.common.util.StringUtils;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStreamingOutput implements StreamingOutput {
- private JavaPairDStream<String, Integer> wordCounts;
+ private BlockingQueue<String> responseQueue = new LinkedBlockingQueue<String>();
+
private JavaStreamingContext jssc;
- private boolean operationCompleted;
- public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer> wordCounts) {
+ private volatile boolean sparkBatchCompleted;
+ public SparkStreamingOutput(JavaStreamingContext jssc) {
this.jssc = jssc;
- this.wordCounts = wordCounts;
}
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
- wordCounts.foreachRDD(new OutputFunction(output));
- jssc.start();
- waitForOperationCompleted();
- jssc.stop(false);
- jssc.close();
- }
-
- private synchronized void waitForOperationCompleted() {
- while (!operationCompleted) {
+ while (!sparkBatchCompleted || !responseQueue.isEmpty()) {
try {
- wait();
+ String responseEntry = responseQueue.poll(1, TimeUnit.MILLISECONDS);
+ if (responseEntry != null) {
+ output.write(StringUtils.toBytesUTF8(responseEntry));
+ output.flush();
+ }
} catch (InterruptedException e) {
- return;
+ // continue;
}
}
+
+ jssc.stop(false);
+ jssc.close();
}
- public synchronized void setOperationCompleted() {
- this.operationCompleted = true;
- notify();
+ public void setSparkBatchCompleted() {
+ this.sparkBatchCompleted = true;
}
-
- // This dedicated class was introduced to validate that when Spark is running it does not
- // fail the processing due to OutputStream being one of the fields in the serializable class,
- private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
- private static final long serialVersionUID = 1L;
- private OutputStream os;
- OutputFunction(OutputStream os) {
- this.os = os;
- }
- @Override
- public void call(JavaPairRDD<String, Integer> rdd) {
- for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
- String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
- try {
- os.write(value.getBytes());
- os.flush();
- } catch (IOException ex) {
- throw new WebApplicationException();
- }
- }
- }
-
+ public void addResponseEntry(String value) {
+ responseQueue.add(value);
}
-
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 7d14e9e..f986225 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -21,6 +21,7 @@ package demo.jaxrs.server;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -36,9 +37,11 @@ import javax.ws.rs.container.Suspended;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -65,10 +68,12 @@ public class StreamingService {
try {
JavaReceiverInputDStream<String> receiverStream =
jssc.receiverStream(new InputStreamReceiver(is));
- SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc,
- createOutputDStream(receiverStream));
+ SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
jssc.addStreamingListener(new SparkStreamingListener(streamOut));
-
+ JavaPairDStream<String, Integer> wordCounts = createOutputDStream(receiverStream);
+ wordCounts.foreachRDD(new OutputFunction(streamOut));
+ jssc.start();
+
executor.execute(new Runnable() {
public void run() {
async.resume(streamOut);
@@ -111,6 +116,20 @@ public class StreamingService {
}
});
}
-
+ private static class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+ private SparkStreamingOutput streamOut;
+ OutputFunction(SparkStreamingOutput streamOut) {
+ this.streamOut = streamOut;
+ }
+ @Override
+ public void call(JavaPairRDD<String, Integer> rdd) {
+ for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
+ String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
+ streamOut.addResponseEntry(value);
+ }
+ }
+
+ }
}