You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/06 12:56:09 UTC
cxf git commit: [CXF-6618] Simplifying the sync code
Repository: cxf
Updated Branches:
refs/heads/master e6d84cf9f -> 203b5433b
[CXF-6618] Simplifying the sync code
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/203b5433
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/203b5433
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/203b5433
Branch: refs/heads/master
Commit: 203b5433bc44aa831bdd9b1dd4ab474d192dea6c
Parents: e6d84cf
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 6 13:55:53 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 6 13:55:53 2016 +0100
----------------------------------------------------------------------
.../jaxrs/server/SparkStreamingListener.java | 5 +---
.../demo/jaxrs/server/SparkStreamingOutput.java | 28 +++++++-------------
.../demo/jaxrs/server/StreamingService.java | 5 ++--
3 files changed, 13 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 3ee5558..1881857 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -19,10 +19,6 @@ public class SparkStreamingListener implements StreamingListener {
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted event) {
- // as soon as the batch is finished we let the streaming context go
- // but this may need to be revisited if a given InputStream happens to be processed in
- // multiple batches ?
- sparkStreamingOutput.setBatchCompleted();
}
@Override
@@ -35,6 +31,7 @@ public class SparkStreamingListener implements StreamingListener {
@Override
public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
+ sparkStreamingOutput.setOperationCompleted();
}
@Override
http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index e3ac218..e28bb5a 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -33,8 +33,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class SparkStreamingOutput implements StreamingOutput {
private JavaPairDStream<String, Integer> wordCounts;
private JavaStreamingContext jssc;
- private boolean sparkDone;
- private boolean batchCompleted;
+ private boolean operationCompleted;
public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer> wordCounts) {
this.jssc = jssc;
this.wordCounts = wordCounts;
@@ -44,13 +43,13 @@ public class SparkStreamingOutput implements StreamingOutput {
public void write(final OutputStream output) throws IOException, WebApplicationException {
wordCounts.foreachRDD(new OutputFunction(output));
jssc.start();
- awaitTermination();
+ waitForOperationCompleted();
jssc.stop(false);
jssc.close();
}
- private synchronized void awaitTermination() {
- while (!sparkDone) {
+ private synchronized void waitForOperationCompleted() {
+ while (!operationCompleted) {
try {
wait();
} catch (InterruptedException e) {
@@ -58,18 +57,14 @@ public class SparkStreamingOutput implements StreamingOutput {
}
}
}
- private synchronized void releaseStreamingContext() {
- if (batchCompleted) {
- sparkDone = true;
- notify();
- }
- }
-
- public synchronized void setBatchCompleted() {
- batchCompleted = true;
- }
+ public synchronized void setOperationCompleted() {
+ this.operationCompleted = true;
+ notify();
+ }
+
+
// This dedicated class was introduced to validate that when Spark is running it does not
// fail the processing due to OutputStream being one of the fields in the serializable class,
private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
@@ -89,9 +84,6 @@ public class SparkStreamingOutput implements StreamingOutput {
throw new WebApplicationException();
}
}
- // Right now we assume by the time we call it the whole InputStream has been
- // processed
- releaseStreamingContext();
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 19d1dac..7d14e9e 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -52,9 +52,9 @@ import scala.Tuple2;
public class StreamingService {
private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));
- private SparkConf sparkConf;
+ private JavaStreamingContext jssc;
public StreamingService(SparkConf sparkConf) {
- this.sparkConf = sparkConf;
+ jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
}
@POST
@@ -63,7 +63,6 @@ public class StreamingService {
@Produces("text/plain")
public void getStream(@Suspended AsyncResponse async, InputStream is) {
try {
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> receiverStream =
jssc.receiverStream(new InputStreamReceiver(is));
SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc,