You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/10 14:41:20 UTC

[12/37] cxf git commit: [CXF-6618] Minimizing the dep on the Spark api in StreamingOutputImpl

[CXF-6618] Minimizing the dep  on the Spark api in StreamingOutputImpl


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/46438c48
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/46438c48
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/46438c48

Branch: refs/heads/master-jaxrs-2.1
Commit: 46438c487115889c4cadd86f70fd4109a0daa2b8
Parents: e2c866a
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 6 15:49:17 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 6 15:49:17 2016 +0100

----------------------------------------------------------------------
 .../jaxrs/server/SparkStreamingListener.java    |  8 +--
 .../demo/jaxrs/server/SparkStreamingOutput.java | 67 +++++++-------------
 .../demo/jaxrs/server/StreamingService.java     | 27 ++++++--
 3 files changed, 49 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
index 1881857..3fd7284 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -11,14 +11,15 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
 
 public class SparkStreamingListener implements StreamingListener {
-    private SparkStreamingOutput sparkStreamingOutput;
+    private SparkStreamingOutput streamOutput;
 
-    public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) {
-        this.sparkStreamingOutput = sparkStreamingOutput;
+    public SparkStreamingListener(SparkStreamingOutput streamOutput) {
+        this.streamOutput = streamOutput;
     }
 
     @Override
     public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+        streamOutput.setSparkBatchCompleted();
     }
 
     @Override
@@ -31,7 +32,6 @@ public class SparkStreamingListener implements StreamingListener {
 
     @Override
     public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
-        sparkStreamingOutput.setOperationCompleted();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index e28bb5a..43166fe 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -20,72 +20,49 @@ package demo.jaxrs.server;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.StreamingOutput;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.cxf.common.util.StringUtils;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 
 public class SparkStreamingOutput implements StreamingOutput {
-    private JavaPairDStream<String, Integer> wordCounts;
+    private BlockingQueue<String> responseQueue = new LinkedBlockingQueue<String>();
+    
     private JavaStreamingContext jssc;
-    private boolean operationCompleted;
-    public SparkStreamingOutput(JavaStreamingContext jssc, JavaPairDStream<String, Integer> wordCounts) {
+    private volatile boolean sparkBatchCompleted;
+    public SparkStreamingOutput(JavaStreamingContext jssc) {
         this.jssc = jssc;
-        this.wordCounts = wordCounts;
     }
 
     @Override
     public void write(final OutputStream output) throws IOException, WebApplicationException {
-        wordCounts.foreachRDD(new OutputFunction(output));
-        jssc.start();
-        waitForOperationCompleted();
-        jssc.stop(false);
-        jssc.close();
-    }
-
-    private synchronized void waitForOperationCompleted() {
-        while (!operationCompleted) {
+        while (!sparkBatchCompleted || !responseQueue.isEmpty()) {
             try {
-                wait();
+                String responseEntry = responseQueue.poll(1, TimeUnit.MILLISECONDS);
+                if (responseEntry != null) {
+                    output.write(StringUtils.toBytesUTF8(responseEntry));
+                    output.flush();
+                }
             } catch (InterruptedException e) {
-                return;
+                // continue;
             }
         }
+        
+        jssc.stop(false);
+        jssc.close();
     }
     
     
-    public synchronized void setOperationCompleted() {
-        this.operationCompleted = true;
-        notify();
+    public void setSparkBatchCompleted() {
+        this.sparkBatchCompleted = true;
     }
 
-
-    // This dedicated class was introduced to validate that when Spark is running it does not
-    // fail the processing due to OutputStream being one of the fields in the serializable class,
-    private class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
-        private static final long serialVersionUID = 1L;
-        private OutputStream os;
-        OutputFunction(OutputStream os) {
-            this.os = os;
-        }
-        @Override
-        public void call(JavaPairRDD<String, Integer> rdd) {
-            for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
-                String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
-                try {
-                    os.write(value.getBytes());
-                    os.flush();
-                } catch (IOException ex) {
-                    throw new WebApplicationException(); 
-                }
-            }
-        }
-        
+    public void addResponseEntry(String value) {
+        responseQueue.add(value);
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/46438c48/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 7d14e9e..f986225 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -21,6 +21,7 @@ package demo.jaxrs.server;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -36,9 +37,11 @@ import javax.ws.rs.container.Suspended;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -65,10 +68,12 @@ public class StreamingService {
         try {
             JavaReceiverInputDStream<String> receiverStream = 
                 jssc.receiverStream(new InputStreamReceiver(is));
-            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc, 
-                                                                createOutputDStream(receiverStream));
+            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
             jssc.addStreamingListener(new SparkStreamingListener(streamOut));
-                                        
+            JavaPairDStream<String, Integer> wordCounts = createOutputDStream(receiverStream);
+            wordCounts.foreachRDD(new OutputFunction(streamOut));
+            jssc.start();
+                                                    
             executor.execute(new Runnable() {
                 public void run() {
                      async.resume(streamOut);
@@ -111,6 +116,20 @@ public class StreamingService {
                 }
             });
     }
-   
+    private static class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
+        private static final long serialVersionUID = 1L;
+        private SparkStreamingOutput streamOut;
+        OutputFunction(SparkStreamingOutput streamOut) {
+            this.streamOut = streamOut;
+        }
+        @Override
+        public void call(JavaPairRDD<String, Integer> rdd) {
+            for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
+                String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
+                streamOut.addResponseEntry(value);
+            }
+        }
+        
+    }
     
 }