You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/20 11:50:14 UTC

cxf git commit: Few updates to the spark demo

Repository: cxf
Updated Branches:
  refs/heads/master 44c608c20 -> 0af65a4ab


Few updates to the spark demo


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0af65a4a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0af65a4a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0af65a4a

Branch: refs/heads/master
Commit: 0af65a4ab3ef2c31b9e88d34ddfbb05c79f0f50c
Parents: 44c608c
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 20 12:49:58 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 20 12:49:58 2016 +0100

----------------------------------------------------------------------
 .../demo/jaxrs/server/InputStreamReceiver.java  | 63 --------------------
 .../demo/jaxrs/server/StreamingService.java     | 42 +++++++++----
 .../demo/jaxrs/server/StringListReceiver.java   | 44 ++++++++++++++
 .../java/demo/jaxrs/server/TikaReceiver.java    | 43 -------------
 4 files changed, 75 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
deleted file mode 100644
index 790ee35..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-public class InputStreamReceiver extends Receiver<String> {
-
-    private static final long serialVersionUID = 1L;
-    private List<String> inputStrings = new LinkedList<String>();
-    
-    public InputStreamReceiver(InputStream is) {
-        super(StorageLevel.MEMORY_ONLY());
-        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-        String userInput = null;
-        while ((userInput = readLine(reader)) != null) {
-            inputStrings.add(userInput);
-        }
-    }
-    @Override
-    public void onStart() {
-        super.store(inputStrings.iterator());
-    }
-
-    private String readLine(BufferedReader reader) {
-        try {
-            return reader.readLine();
-        } catch (IOException ex) {
-            throw new WebApplicationException(500);
-        }
-    }
-    @Override
-    public void onStop() {
-        // complete
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 000d8bc..2ce1531 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -18,7 +18,11 @@
  */
 package demo.jaxrs.server;
 
+import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -93,7 +97,7 @@ public class StreamingService {
         TikaContentExtractor tika = new TikaContentExtractor();
         TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
                                                mediaType);
-        processStream(async, new TikaReceiver(tikaContent));
+        processStream(async, new StringListReceiver(getStringsFromString(tikaContent.getContent())));
     }
     
     @POST
@@ -101,7 +105,7 @@ public class StreamingService {
     @Consumes("text/plain")
     @Produces("text/plain")
     public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
-        processStream(async, new InputStreamReceiver(is));
+        processStream(async, new StringListReceiver(getStringsFromInputStream(is)));
     }
 
     private void processStream(AsyncResponse async, Receiver<String> receiver) {
@@ -145,15 +149,12 @@ public class StreamingService {
     private static Iterator<String> splitInputString(String x) {
         List<String> list = new LinkedList<String>();
         for (String s : Arrays.asList(x.split(" "))) {
-            s = s.replaceAll("[\\s\n\r]", " ").trim();
-            for (String s2 : Arrays.asList(s.split(" "))) {
-                s2 = s2.trim();
-                if (s2.endsWith(":") || s2.endsWith(",") || s2.endsWith(";") || s2.endsWith(".")) {
-                    s2 = s2.substring(0, s2.length() - 1);
-                }
-                if (!s2.isEmpty()) {
-                    list.add(s2);
-                }
+            s = s.trim();
+            if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || s.endsWith(".")) {
+                s = s.substring(0, s.length() - 1);
+            }
+            if (!s.isEmpty()) {
+                list.add(s);
             }
         }
         return list.iterator();
@@ -178,4 +179,23 @@ public class StreamingService {
         new Random().nextBytes(bytes);
         return Base64Utility.encode(bytes);
     }
+    private List<String> getStringsFromInputStream(InputStream is) {
+        return getStringsFromReader(new BufferedReader(new InputStreamReader(is)));
+    }
+    private List<String> getStringsFromString(String s) {
+        return getStringsFromReader(new BufferedReader(new StringReader(s)));
+    }
+    private List<String> getStringsFromReader(BufferedReader reader) {
+        
+        List<String> inputStrings = new LinkedList<String>();
+        String userInput = null;
+        try {
+            while ((userInput = reader.readLine()) != null) {
+                inputStrings.add(userInput);
+            }
+        } catch (IOException ex) {
+            throw new WebApplicationException(ex);
+        }
+        return inputStrings;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
new file mode 100644
index 0000000..55b6428
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.util.List;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class StringListReceiver extends Receiver<String> {
+
+    private static final long serialVersionUID = 1L;
+    private List<String> inputStrings;
+    
+    public StringListReceiver(List<String> inputStrings) {
+        super(StorageLevel.MEMORY_ONLY());
+        this.inputStrings = inputStrings;
+    }
+    @Override
+    public void onStart() {
+        super.store(inputStrings.iterator());
+    }
+    @Override
+    public void onStop() {
+        // complete
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
deleted file mode 100644
index daab2be..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-public class TikaReceiver extends Receiver<String> {
-
-    private static final long serialVersionUID = 1L;
-    private TikaContent tikaContent;
-    
-    public TikaReceiver(TikaContent tikaContent) {
-        super(StorageLevel.MEMORY_ONLY());
-        this.tikaContent = tikaContent;
-    }
-    @Override
-    public void onStart() {
-        super.store(tikaContent.getContent());
-    }
-    @Override
-    public void onStop() {
-        // complete
-    }
-    
-}