You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/13 12:42:20 UTC
cxf git commit: Better splitting of the demo PDF content
Repository: cxf
Updated Branches:
refs/heads/master f564ac8d8 -> 0d5577ab8
Better splitting of the demo PDF content
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0d5577ab
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0d5577ab
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0d5577ab
Branch: refs/heads/master
Commit: 0d5577ab87f324205565c4534b59e69049505356
Parents: f564ac8
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 13 13:42:04 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 13 13:42:04 2016 +0100
----------------------------------------------------------------------
.../demo/jaxrs/server/StreamingService.java | 65 ++++++++++----------
1 file changed, 34 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/0d5577ab/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 4f82b5e..0d2fdea 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
@@ -46,9 +48,6 @@ import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -56,6 +55,9 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
import org.apache.tika.parser.odf.OpenDocumentParser;
import org.apache.tika.parser.pdf.PDFParser;
@@ -95,6 +97,8 @@ public class StreamingService {
}
}
+ ParseContext context = new ParseContext();
+ context.set(Parser.class, new AutoDetectParser());
TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
mediaType);
processStream(async, new TikaReceiver(tikaContent));
@@ -134,38 +138,33 @@ public class StreamingService {
}
}
- private static String getRandomId() {
- byte[] bytes = new byte[10];
- new Random().nextBytes(bytes);
- return Base64Utility.encode(bytes);
- }
-
- @SuppressWarnings("serial")
private static JavaPairDStream<String, Integer> createOutputDStream(
JavaReceiverInputDStream<String> receiverStream) {
- final JavaDStream<String> words = receiverStream.flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- });
- final JavaPairDStream<String, Integer> pairs = words.mapToPair(
- new PairFunction<String, String, Integer>() {
+ final JavaDStream<String> words =
+ receiverStream.flatMap(x -> splitInputString(x));
- @Override
- public Tuple2<String, Integer> call(String s) {
+ final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> {
return new Tuple2<String, Integer>(s, 1);
- }
- });
- return pairs.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
-
- @Override
- public Integer call(Integer i1, Integer i2) {
+ });
+ return pairs.reduceByKey((i1, i2) -> {
return i1 + i2;
+ });
+ }
+ private static Iterator<String> splitInputString(String x) {
+ List<String> list = new LinkedList<String>();
+ for (String s : Arrays.asList(x.split(" "))) {
+ s = s.replaceAll("[\\s\n\r]", " ").trim();
+ for (String s2 : Arrays.asList(s.split(" "))) {
+ s2 = s2.trim();
+ if (s2.endsWith(":") || s2.endsWith(",") || s2.endsWith(";") || s2.endsWith(".")) {
+ s2 = s2.substring(0, s2.length() - 1);
}
- });
+ if (!s2.isEmpty()) {
+ list.add(s2);
+ }
+ }
+ }
+ return list.iterator();
}
private static class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
private static final long serialVersionUID = 1L;
@@ -176,11 +175,15 @@ public class StreamingService {
@Override
public void call(JavaPairRDD<String, Integer> rdd) {
for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
- String value = entry.getKey() + " : " + entry.getValue() + "\r\n";
+ String value = entry.getKey() + " : " + entry.getValue() + "\n";
streamOut.addResponseEntry(value);
}
}
}
-
+ private static String getRandomId() {
+ byte[] bytes = new byte[10];
+ new Random().nextBytes(bytes);
+ return Base64Utility.encode(bytes);
+ }
}