You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/02/11 22:53:16 UTC

cxf git commit: Avoiding the checkstyle issues for now...

Repository: cxf
Updated Branches:
  refs/heads/master 22755d26e -> f2f8b95ad


Avoiding the checkstyle issues for now...


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/f2f8b95a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/f2f8b95a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/f2f8b95a

Branch: refs/heads/master
Commit: f2f8b95ad823b38aa68e7a2071eac0b5b637b2aa
Parents: 22755d2
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Thu Feb 11 21:53:01 2016 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Thu Feb 11 21:53:01 2016 +0000

----------------------------------------------------------------------
 .../jaxrs/server/AdvancedStreamingService.java  | 41 ++------------------
 1 file changed, 3 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/f2f8b95a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
index 9ccf847..412935d 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
@@ -19,7 +19,6 @@
 package demo.jaxrs.server;
 
 import java.io.InputStream;
-import java.util.Arrays;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.POST;
@@ -28,19 +27,12 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.StreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.dstream.ReceiverInputDStream;
 import org.apache.spark.streaming.receiver.Receiver;
 
-import scala.Tuple2;
 import scala.reflect.ClassTag;
 
 // INCOMPLETE
@@ -48,10 +40,9 @@ import scala.reflect.ClassTag;
 @Path("/")
 public class AdvancedStreamingService {
     private JavaStreamingContext jssc;
-    private MyReceiverInputDStream receiverInputDStream;
     public AdvancedStreamingService(SparkConf sparkConf) {
         this.jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
-        this.receiverInputDStream = new MyReceiverInputDStream(jssc.ssc(), 
+        new MyReceiverInputDStream(jssc.ssc(), 
                                    scala.reflect.ClassTag$.MODULE$.apply(String.class));
     }
     
@@ -64,34 +55,7 @@ public class AdvancedStreamingService {
         return null;
     }
 
-    @SuppressWarnings("serial")
-    private static JavaPairDStream<String, Integer> createOutputDStream(JavaReceiverInputDStream<String> receiverStream) {
-        final JavaDStream<String> words = receiverStream.flatMap(
-            new FlatMapFunction<String, String>() {
-                @Override 
-                public Iterable<String> call(String x) {
-                    return Arrays.asList(x.split(" "));
-                }
-            });
-        final JavaPairDStream<String, Integer> pairs = words.mapToPair(
-            new PairFunction<String, String, Integer>() {
-            
-                @Override 
-                public Tuple2<String, Integer> call(String s) {
-                    return new Tuple2<String, Integer>(s, 1);
-                }
-            });
-        return pairs.reduceByKey(
-            new Function2<Integer, Integer, Integer>() {
-             
-                @Override 
-                public Integer call(Integer i1, Integer i2) {
-                    return i1 + i2;
-                }
-            });
-    }
-   
-    
+     
     public static class MyReceiverInputDStream extends ReceiverInputDStream<String> {
 
         public MyReceiverInputDStream(StreamingContext ssc_, ClassTag<String> evidence$1) {
@@ -102,6 +66,7 @@ public class AdvancedStreamingService {
         }
         @Override
         public Receiver<String> getReceiver() {
+            // A receiver can be created per every String the input stream
             return new InputStreamReceiver(getInputStream());
         }
         public InputStream getInputStream() {