You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2016/09/20 11:50:14 UTC
cxf git commit: Few updates to the spark demo
Repository: cxf
Updated Branches:
refs/heads/master 44c608c20 -> 0af65a4ab
Few updates to the spark demo
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0af65a4a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0af65a4a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0af65a4a
Branch: refs/heads/master
Commit: 0af65a4ab3ef2c31b9e88d34ddfbb05c79f0f50c
Parents: 44c608c
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Tue Sep 20 12:49:58 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Tue Sep 20 12:49:58 2016 +0100
----------------------------------------------------------------------
.../demo/jaxrs/server/InputStreamReceiver.java | 63 --------------------
.../demo/jaxrs/server/StreamingService.java | 42 +++++++++----
.../demo/jaxrs/server/StringListReceiver.java | 44 ++++++++++++++
.../java/demo/jaxrs/server/TikaReceiver.java | 43 -------------
4 files changed, 75 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
deleted file mode 100644
index 790ee35..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/InputStreamReceiver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.ws.rs.WebApplicationException;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-public class InputStreamReceiver extends Receiver<String> {
-
- private static final long serialVersionUID = 1L;
- private List<String> inputStrings = new LinkedList<String>();
-
- public InputStreamReceiver(InputStream is) {
- super(StorageLevel.MEMORY_ONLY());
- BufferedReader reader = new BufferedReader(new InputStreamReader(is));
- String userInput = null;
- while ((userInput = readLine(reader)) != null) {
- inputStrings.add(userInput);
- }
- }
- @Override
- public void onStart() {
- super.store(inputStrings.iterator());
- }
-
- private String readLine(BufferedReader reader) {
- try {
- return reader.readLine();
- } catch (IOException ex) {
- throw new WebApplicationException(500);
- }
- }
- @Override
- public void onStop() {
- // complete
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index 000d8bc..2ce1531 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -18,7 +18,11 @@
*/
package demo.jaxrs.server;
+import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.StringReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
@@ -93,7 +97,7 @@ public class StreamingService {
TikaContentExtractor tika = new TikaContentExtractor();
TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
mediaType);
- processStream(async, new TikaReceiver(tikaContent));
+ processStream(async, new StringListReceiver(getStringsFromString(tikaContent.getContent())));
}
@POST
@@ -101,7 +105,7 @@ public class StreamingService {
@Consumes("text/plain")
@Produces("text/plain")
public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
- processStream(async, new InputStreamReceiver(is));
+ processStream(async, new StringListReceiver(getStringsFromInputStream(is)));
}
private void processStream(AsyncResponse async, Receiver<String> receiver) {
@@ -145,15 +149,12 @@ public class StreamingService {
private static Iterator<String> splitInputString(String x) {
List<String> list = new LinkedList<String>();
for (String s : Arrays.asList(x.split(" "))) {
- s = s.replaceAll("[\\s\n\r]", " ").trim();
- for (String s2 : Arrays.asList(s.split(" "))) {
- s2 = s2.trim();
- if (s2.endsWith(":") || s2.endsWith(",") || s2.endsWith(";") || s2.endsWith(".")) {
- s2 = s2.substring(0, s2.length() - 1);
- }
- if (!s2.isEmpty()) {
- list.add(s2);
- }
+ s = s.trim();
+ if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || s.endsWith(".")) {
+ s = s.substring(0, s.length() - 1);
+ }
+ if (!s.isEmpty()) {
+ list.add(s);
}
}
return list.iterator();
@@ -178,4 +179,23 @@ public class StreamingService {
new Random().nextBytes(bytes);
return Base64Utility.encode(bytes);
}
+ private List<String> getStringsFromInputStream(InputStream is) {
+ return getStringsFromReader(new BufferedReader(new InputStreamReader(is)));
+ }
+ private List<String> getStringsFromString(String s) {
+ return getStringsFromReader(new BufferedReader(new StringReader(s)));
+ }
+ private List<String> getStringsFromReader(BufferedReader reader) {
+
+ List<String> inputStrings = new LinkedList<String>();
+ String userInput = null;
+ try {
+ while ((userInput = reader.readLine()) != null) {
+ inputStrings.add(userInput);
+ }
+ } catch (IOException ex) {
+ throw new WebApplicationException(ex);
+ }
+ return inputStrings;
+ }
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
new file mode 100644
index 0000000..55b6428
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StringListReceiver.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import java.util.List;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class StringListReceiver extends Receiver<String> {
+
+ private static final long serialVersionUID = 1L;
+ private List<String> inputStrings;
+
+ public StringListReceiver(List<String> inputStrings) {
+ super(StorageLevel.MEMORY_ONLY());
+ this.inputStrings = inputStrings;
+ }
+ @Override
+ public void onStart() {
+ super.store(inputStrings.iterator());
+ }
+ @Override
+ public void onStop() {
+ // complete
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/0af65a4a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
deleted file mode 100644
index daab2be..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-public class TikaReceiver extends Receiver<String> {
-
- private static final long serialVersionUID = 1L;
- private TikaContent tikaContent;
-
- public TikaReceiver(TikaContent tikaContent) {
- super(StorageLevel.MEMORY_ONLY());
- this.tikaContent = tikaContent;
- }
- @Override
- public void onStart() {
- super.store(tikaContent.getContent());
- }
- @Override
- public void onStop() {
- // complete
- }
-
-}