You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/13 12:42:20 UTC

cxf git commit: Better splitting of the demo PDF content

Repository: cxf
Updated Branches:
  refs/heads/master f564ac8d8 -> 0d5577ab8


Better splitting of the demo PDF content


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0d5577ab
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0d5577ab
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0d5577ab

Branch: refs/heads/master
Commit: 0d5577ab87f324205565c4534b59e69049505356
Parents: f564ac8
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 13 13:42:04 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 13 13:42:04 2016 +0100

----------------------------------------------------------------------
 .../demo/jaxrs/server/StreamingService.java     | 65 ++++++++++----------
 1 file changed, 34 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0d5577ab/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 4f82b5e..0d2fdea 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -46,9 +48,6 @@ import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -56,6 +55,9 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
 import org.apache.tika.parser.odf.OpenDocumentParser;
 import org.apache.tika.parser.pdf.PDFParser;
 
@@ -95,6 +97,8 @@ public class StreamingService {
             }
         }
         
+        ParseContext context = new ParseContext();
+        context.set(Parser.class, new AutoDetectParser());
         TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
                                                mediaType);
         processStream(async, new TikaReceiver(tikaContent));
@@ -134,38 +138,33 @@ public class StreamingService {
         }
     }
     
-    private static String getRandomId() {
-        byte[] bytes = new byte[10];
-        new Random().nextBytes(bytes);
-        return Base64Utility.encode(bytes);
-    }
-
-    @SuppressWarnings("serial")
     private static JavaPairDStream<String, Integer> createOutputDStream(
             JavaReceiverInputDStream<String> receiverStream) {
-        final JavaDStream<String> words = receiverStream.flatMap(
-            new FlatMapFunction<String, String>() {
-                @Override 
-                public Iterator<String> call(String x) {
-                    return Arrays.asList(x.split(" ")).iterator();
-                }
-            });
-        final JavaPairDStream<String, Integer> pairs = words.mapToPair(
-            new PairFunction<String, String, Integer>() {
+        final JavaDStream<String> words = 
+            receiverStream.flatMap(x -> splitInputString(x));
             
-                @Override 
-                public Tuple2<String, Integer> call(String s) {
+        final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> {
                     return new Tuple2<String, Integer>(s, 1);
-                }
-            });
-        return pairs.reduceByKey(
-            new Function2<Integer, Integer, Integer>() {
-             
-                @Override 
-                public Integer call(Integer i1, Integer i2) {
+                });
+        return pairs.reduceByKey((i1, i2) -> {
                     return i1 + i2;
+                });
+    }
+    private static Iterator<String> splitInputString(String x) {
+        List<String> list = new LinkedList<String>();
+        for (String s : Arrays.asList(x.split(" "))) {
+            s = s.replaceAll("[\\s\n\r]", " ").trim();
+            for (String s2 : Arrays.asList(s.split(" "))) {
+                s2 = s2.trim();
+                if (s2.endsWith(":") || s2.endsWith(",") || s2.endsWith(";") || s2.endsWith(".")) {
+                    s2 = s2.substring(0, s2.length() - 1);
                 }
-            });
+                if (!s2.isEmpty()) {
+                    list.add(s2);
+                }
+            }
+        }
+        return list.iterator();
     }
     private static class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
         private static final long serialVersionUID = 1L;
@@ -176,11 +175,15 @@ public class StreamingService {
         @Override
         public void call(JavaPairRDD<String, Integer> rdd) {
             for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
-                String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
+                String value = entry.getKey() + " : " + entry.getValue() + "\n";
                 streamOut.addResponseEntry(value);
             }
         }
         
     }
-    
+    private static String getRandomId() {
+        byte[] bytes = new byte[10];
+        new Random().nextBytes(bytes);
+        return Base64Utility.encode(bytes);
+    }
 }