You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/06 12:34:55 UTC
cxf git commit: [CXF-6618] Another minor update
Repository: cxf
Updated Branches:
refs/heads/master da321593b -> e6d84cf9f
[CXF-6618] Another minor update
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e6d84cf9
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e6d84cf9
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e6d84cf9
Branch: refs/heads/master
Commit: e6d84cf9f3ac669003402d122178a55e8b7e4493
Parents: da32159
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 6 13:34:38 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 6 13:34:38 2016 +0100
----------------------------------------------------------------------
.../jaxrs/server/SparkStreamingListener.java | 56 ++++++++++++++++++++
.../demo/jaxrs/server/SparkStreamingOutput.java | 52 +-----------------
.../demo/jaxrs/server/StreamingService.java | 8 +--
3 files changed, 63 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6d84cf9/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
new file mode 100644
index 0000000..3ee5558
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingListener.java
@@ -0,0 +1,56 @@
+package demo.jaxrs.server;
+
+import org.apache.spark.streaming.scheduler.StreamingListener;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
+import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
+import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
+
+public class SparkStreamingListener implements StreamingListener {
+ private SparkStreamingOutput sparkStreamingOutput;
+
+ public SparkStreamingListener(SparkStreamingOutput sparkStreamingOutput) {
+ this.sparkStreamingOutput = sparkStreamingOutput;
+ }
+
+ @Override
+ public void onBatchCompleted(StreamingListenerBatchCompleted event) {
+ // as soon as the batch is finished we let the streaming context go
+ // but this may need to be revisited if a given InputStream happens to be processed in
+ // multiple batches ?
+ sparkStreamingOutput.setBatchCompleted();
+ }
+
+ @Override
+ public void onBatchStarted(StreamingListenerBatchStarted event) {
+ }
+
+ @Override
+ public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
+ }
+
+ @Override
+ public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
+ }
+
+ @Override
+ public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
+ }
+
+ @Override
+ public void onReceiverError(StreamingListenerReceiverError event) {
+ }
+
+ @Override
+ public void onReceiverStarted(StreamingListenerReceiverStarted event) {
+ }
+
+ @Override
+ public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6d84cf9/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 2220dd4..e3ac218 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -29,15 +29,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.scheduler.StreamingListener;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
-import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
public class SparkStreamingOutput implements StreamingOutput {
private JavaPairDStream<String, Integer> wordCounts;
@@ -52,7 +43,6 @@ public class SparkStreamingOutput implements StreamingOutput {
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
wordCounts.foreachRDD(new OutputFunction(output));
- jssc.addStreamingListener(new SparkStreamingListener());
jssc.start();
awaitTermination();
jssc.stop(false);
@@ -75,7 +65,7 @@ public class SparkStreamingOutput implements StreamingOutput {
}
}
- private synchronized void setBatchCompleted() {
+ public synchronized void setBatchCompleted() {
batchCompleted = true;
}
@@ -105,43 +95,5 @@ public class SparkStreamingOutput implements StreamingOutput {
}
}
- private class SparkStreamingListener implements StreamingListener {
-
- @Override
- public void onBatchCompleted(StreamingListenerBatchCompleted event) {
- // as soon as the batch is finished we let the streaming context go
- // but this may need to be revisited if a given InputStream happens to be processed in
- // multiple batches ?
- setBatchCompleted();
- }
-
- @Override
- public void onBatchStarted(StreamingListenerBatchStarted event) {
- }
-
- @Override
- public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
- }
-
- @Override
- public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
- }
-
- @Override
- public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
- }
-
- @Override
- public void onReceiverError(StreamingListenerReceiverError event) {
- }
-
- @Override
- public void onReceiverStarted(StreamingListenerReceiverStarted event) {
- }
-
- @Override
- public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
- }
-
- }
+
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e6d84cf9/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index b65a68b..19d1dac 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -66,11 +66,13 @@ public class StreamingService {
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<String> receiverStream =
jssc.receiverStream(new InputStreamReceiver(is));
-
+ SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc,
+ createOutputDStream(receiverStream));
+ jssc.addStreamingListener(new SparkStreamingListener(streamOut));
+
executor.execute(new Runnable() {
public void run() {
- async.resume(new SparkStreamingOutput(jssc,
- createOutputDStream(receiverStream)));
+ async.resume(streamOut);
}
});
} catch (Exception ex) {