You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/10 14:41:17 UTC

[09/37] cxf git commit: [CXF-6618] Another minor update

[CXF-6618] Another minor update


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e6d84cf9
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e6d84cf9
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e6d84cf9

Branch: refs/heads/master-jaxrs-2.1
Commit: e6d84cf9f3ac669003402d122178a55e8b7e4493
Parents: da32159
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 6 13:34:38 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 6 13:34:38 2016 +0100

----------------------------------------------------------------------
 .../jaxrs/server/SparkStreamingListener.java    | 56 ++++++++++++++++++++
 .../demo/jaxrs/server/SparkStreamingOutput.java | 52 +-----------------
 .../demo/jaxrs/server/StreamingService.java     |  8 +--
 3 files changed, 63 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/e6d84cf9/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
new file mode 100644
index 0000000..3ee5558
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -0,0 +1,56 @@
+package demo.jaxrs.server;
+
+import org.apache.spark.streaming.scheduler.StreamingListener;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
+
+public class SparkStreamingListener implements StreamingListener {
+    private SparkStreamingOutput sparkStreamingOutput;
+
+    public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) {
+        this.sparkStreamingOutput = sparkStreamingOutput;
+    }
+
+    @Override
+    public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+        // as soon as the batch is finished we let the streaming context go
+        // but this may need to be revisited if a given InputStream happens to be processed in
+        // multiple batches ?
+        sparkStreamingOutput.setBatchCompleted();
+    }
+
+    @Override
+    public void onBatchStarted(StreamingListenerBatchStarted event) {
+    }
+
+    @Override
+    public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
+    }
+
+    @Override
+    public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
+    }
+
+    @Override
+    public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
+    }
+
+    @Override
+    public void onReceiverError(StreamingListenerReceiverError event) {
+    }
+
+    @Override
+    public void onReceiverStarted(StreamingListenerReceiverStarted event) {
+    }
+
+    @Override
+    public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/e6d84cf9/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 2220dd4..e3ac218 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -29,15 +29,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.scheduler.StreamingListener;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
-import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
 
 public class SparkStreamingOutput implements StreamingOutput {
     private JavaPairDStream<String, Integer> wordCounts;
@@ -52,7 +43,6 @@ public class SparkStreamingOutput implements StreamingOutput {
     @Override
     public void write(final OutputStream output) throws IOException, WebApplicationException {
         wordCounts.foreachRDD(new OutputFunction(output));
-        jssc.addStreamingListener(new SparkStreamingListener());
         jssc.start();
         awaitTermination();
         jssc.stop(false);
@@ -75,7 +65,7 @@ public class SparkStreamingOutput implements StreamingOutput {
         }
     }
     
-    private synchronized void setBatchCompleted() {
+    public synchronized void setBatchCompleted() {
         batchCompleted = true;
     }
     
@@ -105,43 +95,5 @@ public class SparkStreamingOutput implements StreamingOutput {
         }
         
     }
-    private class SparkStreamingListener implements StreamingListener {
-
-        @Override
-        public void onBatchCompleted(StreamingListenerBatchCompleted event) {
-            // as soon as the batch is finished we let the streaming context go
-            // but this may need to be revisited if a given InputStream happens to be processed in
-            // multiple batches ?
-            setBatchCompleted();
-        }
-
-        @Override
-        public void onBatchStarted(StreamingListenerBatchStarted event) {
-        }
-
-        @Override
-        public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
-        }
-
-        @Override
-        public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
-        }
-
-        @Override
-        public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
-        }
-
-        @Override
-        public void onReceiverError(StreamingListenerReceiverError event) {
-        }
-
-        @Override
-        public void onReceiverStarted(StreamingListenerReceiverStarted event) {
-        }
-
-        @Override
-        public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
-        }
-        
-    }
+    
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/e6d84cf9/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index b65a68b..19d1dac 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -66,11 +66,13 @@ public class StreamingService {
             JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
             JavaReceiverInputDStream<String> receiverStream = 
                 jssc.receiverStream(new InputStreamReceiver(is));
-            
+            SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc, 
+                                                                createOutputDStream(receiverStream));
+            jssc.addStreamingListener(new SparkStreamingListener(streamOut));
+                                        
             executor.execute(new Runnable() {
                 public void run() {
-                     async.resume(new SparkStreamingOutput(jssc, 
-                                      createOutputDStream(receiverStream)));
+                     async.resume(streamOut);
                 }
             });
         } catch (Exception ex) {