You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/02/11 22:53:16 UTC
cxf git commit: Avoiding the checkstyle issues for now...
Repository: cxf
Updated Branches:
refs/heads/master 22755d26e -> f2f8b95ad
Avoiding the checkstyle issues for now...
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/f2f8b95a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/f2f8b95a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/f2f8b95a
Branch: refs/heads/master
Commit: f2f8b95ad823b38aa68e7a2071eac0b5b637b2aa
Parents: 22755d2
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Thu Feb 11 21:53:01 2016 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Thu Feb 11 21:53:01 2016 +0000
----------------------------------------------------------------------
.../jaxrs/server/AdvancedStreamingService.java | 41 ++------------------
1 file changed, 3 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/f2f8b95a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
index 9ccf847..412935d 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
@@ -19,7 +19,6 @@
package demo.jaxrs.server;
import java.io.InputStream;
-import java.util.Arrays;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
@@ -28,19 +27,12 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.StreamingOutput;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.receiver.Receiver;
-import scala.Tuple2;
import scala.reflect.ClassTag;
// INCOMPLETE
@@ -48,10 +40,9 @@ import scala.reflect.ClassTag;
@Path("/")
public class AdvancedStreamingService {
private JavaStreamingContext jssc;
- private MyReceiverInputDStream receiverInputDStream;
public AdvancedStreamingService(SparkConf sparkConf) {
this.jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
- this.receiverInputDStream = new MyReceiverInputDStream(jssc.ssc(),
+ new MyReceiverInputDStream(jssc.ssc(),
scala.reflect.ClassTag$.MODULE$.apply(String.class));
}
@@ -64,34 +55,7 @@ public class AdvancedStreamingService {
return null;
}
- @SuppressWarnings("serial")
- private static JavaPairDStream<String, Integer> createOutputDStream(JavaReceiverInputDStream<String> receiverStream) {
- final JavaDStream<String> words = receiverStream.flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
- }
- });
- final JavaPairDStream<String, Integer> pairs = words.mapToPair(
- new PairFunction<String, String, Integer>() {
-
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- });
- return pairs.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
-
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
- }
-
-
+
public static class MyReceiverInputDStream extends ReceiverInputDStream<String> {
public MyReceiverInputDStream(StreamingContext ssc_, ClassTag<String> evidence$1) {
@@ -102,6 +66,7 @@ public class AdvancedStreamingService {
}
@Override
public Receiver<String> getReceiver() {
+ // A receiver can be created per every String the input stream
return new InputStreamReceiver(getInputStream());
}
public InputStream getInputStream() {