You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/06 12:56:09 UTC

cxf git commit: [CXF-6618] Simplifying the sync code

Repository: cxf
Updated Branches:
  refs/heads/master e6d84cf9f -> 203b5433b


[CXF-6618] Simplifying the sync code


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/203b5433
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/203b5433
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/203b5433

Branch: refs/heads/master
Commit: 203b5433bc44aa831bdd9b1dd4ab474d192dea6c
Parents: e6d84cf
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 6 13:55:53 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 6 13:55:53 2016 +0100

----------------------------------------------------------------------
 .../jaxrs/server/SparkStreamingListener.java    |  5 +---
 .../demo/jaxrs/server/SparkStreamingOutput.java | 28 +++++++-------------
 .../demo/jaxrs/server/StreamingService.java     |  5 ++--
 3 files changed, 13 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 3ee5558..1881857 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -19,10 +19,6 @@ public class SparkStreamingListener implements StreamingListener {
 
     @Override
     public void onBatchCompleted(StreamingListenerBatchCompleted event) {
-        // as soon as the batch is finished we let the streaming context go
-        // but this may need to be revisited if a given InputStream happens to be processed in
-        // multiple batches ?
-        sparkStreamingOutput.setBatchCompleted();
     }
 
     @Override
@@ -35,6 +31,7 @@ public class SparkStreamingListener implements StreamingListener {
 
     @Override
     public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
+        sparkStreamingOutput.setOperationCompleted();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index e3ac218..e28bb5a 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -33,8 +33,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 public class SparkStreamingOutput implements StreamingOutput {
     private JavaPairDStream<String, Integer> wordCounts;
     private JavaStreamingContext jssc;
-    private boolean sparkDone;
-    private boolean batchCompleted;
+    private boolean operationCompleted;
     public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer> wordCounts) {
         this.jssc = jssc;
         this.wordCounts = wordCounts;
@@ -44,13 +43,13 @@ public class SparkStreamingOutput implements StreamingOutput {
     public void write(final OutputStream output) throws IOException, WebApplicationException {
         wordCounts.foreachRDD(new OutputFunction(output));
         jssc.start();
-        awaitTermination();
+        waitForOperationCompleted();
         jssc.stop(false);
         jssc.close();
     }
 
-    private synchronized void awaitTermination() {
-        while (!sparkDone) {
+    private synchronized void waitForOperationCompleted() {
+        while (!operationCompleted) {
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -58,18 +57,14 @@ public class SparkStreamingOutput implements StreamingOutput {
             }
         }
     }
-    private synchronized void releaseStreamingContext() {
-        if (batchCompleted) {
-            sparkDone = true;
-            notify();
-        }
-    }
-    
-    public synchronized void setBatchCompleted() {
-        batchCompleted = true;
-    }
     
     
+    public synchronized void setOperationCompleted() {
+        this.operationCompleted = true;
+        notify();
+    }
+
+
     // This dedicated class was introduced to validate that when Spark is running it does not
     // fail the processing due to OutputStream being one of the fields in the serializable class,
     private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
@@ -89,9 +84,6 @@ public class SparkStreamingOutput implements StreamingOutput {
                     throw new WebApplicationException(); 
                 }
             }
-            // Right now we assume by the time we call it the whole InputStream has been
-            // processed
-            releaseStreamingContext();
         }
         
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/203b5433/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 19d1dac..7d14e9e 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -52,9 +52,9 @@ import scala.Tuple2;
 public class StreamingService {
     private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                                        new ArrayBlockingQueue<Runnable>(10));
-    private SparkConf sparkConf;
+    private JavaStreamingContext jssc;
     public StreamingService(SparkConf sparkConf) {
-        this.sparkConf = sparkConf;
+        jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
     }
     
     @POST
@@ -63,7 +63,6 @@ public class StreamingService {
     @Produces("text/plain")
     public void getStream(@Suspended AsyncResponse async, InputStream is) {
         try {
-            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
             JavaReceiverInputDStream<String> receiverStream = 
                 jssc.receiverStream(new InputStreamReceiver(is));
             SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc,