You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/10 14:41:27 UTC
[19/37] cxf git commit: [CXF-6618] Initial code for processing binary
data with Tika and pushing them to Spark
[CXF-6618] Initial code for processing binary data with Tika and pushing them to Spark
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/4427f779
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/4427f779
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/4427f779
Branch: refs/heads/master-jaxrs-2.1
Commit: 4427f7790e522144baa325e5b5c051885e9dc706
Parents: d68d8d8
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Thu Sep 8 13:36:36 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Thu Sep 8 13:36:36 2016 +0100
----------------------------------------------------------------------
.../release/samples/jax_rs/spark/README.txt | 18 +++--
.../main/release/samples/jax_rs/spark/pom.xml | 15 ++++
.../jaxrs/server/AdvancedStreamingService.java | 80 --------------------
.../src/main/java/demo/jaxrs/server/Server.java | 2 +-
.../demo/jaxrs/server/SparkStreamingOutput.java | 4 +-
.../demo/jaxrs/server/StreamingService.java | 37 ++++++++-
.../java/demo/jaxrs/server/TikaReceiver.java | 43 +++++++++++
.../spark/src/main/resources/multipartForm.html | 49 ++++++++++++
8 files changed, 154 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt
index e6a218d..8a7b292 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt
@@ -1,5 +1,5 @@
-JAX-RS Basic Spark Demo
-=======================
+JAX-RS Spark Streaming Demo
+===========================
This demo demonstrates how to connect HTTP and Spark streams with JAX-RS
@@ -9,15 +9,17 @@ mvn exec:java
Next do:
+1. Simple text processing:
+
curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/stream
-Limitations:
+2. PDF processing:
-This demo accepts one request at a time due to Spark restricting that only a single streaming context can be active
-in JVM at a given moment of time. This is the error which will be logged if you try to access the demo server concurrently:
+Open multipart.html located in src/main/resources, locate any PDF file available on the local disk and upload.
+Note Spark restricts that only a single streaming context can be active in JVM at a given moment of time.
+This is the error which will be logged if you try to access the demo server concurrently:
"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
- To ignore this error, set spark.driver.allowMultipleContexts = true".
-
- More flexible demo server will be added in due time.
+
+To ignore this error, set spark.driver.allowMultipleContexts = true".
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
index b2541a0..10a00da 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
@@ -44,6 +44,11 @@
<version>3.2.0-SNAPSHOT</version>
</dependency>
<dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-extension-search</artifactId>
+ <version>3.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.0.0-preview</version>
@@ -59,6 +64,16 @@
<artifactId>netty</artifactId>
<version>3.7.0.Final</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parser-pdf-module</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
<repositories>
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
deleted file mode 100644
index 1971fd9..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.io.InputStream;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.StreamingOutput;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.StreamingContext;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.ReceiverInputDStream;
-import org.apache.spark.streaming.receiver.Receiver;
-
-import scala.reflect.ClassTag;
-
-// INCOMPLETE
-
-@Path("/")
-public class AdvancedStreamingService {
- private JavaStreamingContext jssc;
- public AdvancedStreamingService(SparkConf sparkConf) {
- this.jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
- new MyReceiverInputDStream(jssc.ssc(),
- scala.reflect.ClassTag$.MODULE$.apply(String.class));
- }
-
- @POST
- @Path("/stream")
- @Consumes("text/plain")
- @Produces("text/plain")
- public StreamingOutput getStream(InputStream is) {
-
- return null;
- }
-
-
- public static class MyReceiverInputDStream extends ReceiverInputDStream<String> {
-
- public MyReceiverInputDStream(StreamingContext ssc, ClassTag<String> evidence) {
- super(ssc, evidence);
- }
- public void putInputStream(InputStream is) {
-
- }
- @Override
- public Receiver<String> getReceiver() {
- // A receiver can be created per every String the input stream
- return new InputStreamReceiver(getInputStream());
- }
- public InputStream getInputStream() {
- // TODO Auto-generated method stub
- return null;
- }
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
index 50f915a..8a1092f 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -30,7 +30,7 @@ public class Server {
sf.setResourceClasses(StreamingService.class);
sf.setResourceProvider(StreamingService.class,
new SingletonResourceProvider(new StreamingService()));
- sf.setAddress("http://localhost:9000/");
+ sf.setAddress("http://localhost:9000/spark");
sf.create();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 43166fe..7324806 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -35,16 +35,18 @@ public class SparkStreamingOutput implements StreamingOutput {
private JavaStreamingContext jssc;
private volatile boolean sparkBatchCompleted;
+ private volatile boolean outputWriteDone;
public SparkStreamingOutput(JavaStreamingContext jssc) {
this.jssc = jssc;
}
@Override
public void write(final OutputStream output) throws IOException, WebApplicationException {
- while (!sparkBatchCompleted || !responseQueue.isEmpty()) {
+ while (!sparkBatchCompleted || !outputWriteDone || !responseQueue.isEmpty()) {
try {
String responseEntry = responseQueue.poll(1, TimeUnit.MILLISECONDS);
if (responseEntry != null) {
+ outputWriteDone = true;
output.write(StringUtils.toBytesUTF8(responseEntry));
output.flush();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index c9cc033..5e059fc 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -35,6 +36,11 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
+import org.apache.cxf.common.util.Base64Utility;
+import org.apache.cxf.jaxrs.ext.multipart.Attachment;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
@@ -47,6 +53,8 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.tika.parser.pdf.PDFParser;
import scala.Tuple2;
@@ -59,20 +67,35 @@ public class StreamingService {
}
@POST
+ @Path("/multipart")
+ @Consumes("multipart/form-data")
+ @Produces("text/plain")
+ public void processMultipartStream(@Suspended AsyncResponse async,
+ @Multipart("file") Attachment att) {
+ TikaContentExtractor tika = new TikaContentExtractor(new PDFParser());
+ TikaContent tikaContent = tika.extract(att.getObject(InputStream.class));
+ processStream(async, new TikaReceiver(tikaContent));
+ }
+
+ @POST
@Path("/stream")
@Consumes("text/plain")
@Produces("text/plain")
- public void getStream(@Suspended AsyncResponse async, InputStream is) {
+ public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
+ processStream(async, new InputStreamReceiver(is));
+ }
+
+ private void processStream(AsyncResponse async, Receiver<String> receiver) {
try {
- SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect");
+ SparkConf sparkConf = new SparkConf().setMaster("local[*]")
+ .setAppName("JAX-RS Spark Connect " + getRandomId());
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut);
jssc.addStreamingListener(sparkListener);
- JavaReceiverInputDStream<String> receiverStream =
- jssc.receiverStream(new InputStreamReceiver(is));
+ JavaReceiverInputDStream<String> receiverStream = jssc.receiverStream(receiver);
JavaPairDStream<String, Integer> wordCounts = createOutputDStream(receiverStream);
wordCounts.foreachRDD(new OutputFunction(streamOut));
jssc.start();
@@ -87,6 +110,12 @@ public class StreamingService {
}
}
}
+
+ private static String getRandomId() {
+ byte[] bytes = new byte[10];
+ new Random().nextBytes(bytes);
+ return Base64Utility.encode(bytes);
+ }
@SuppressWarnings("serial")
private static JavaPairDStream<String, Integer> createOutputDStream(
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
new file mode 100644
index 0000000..daab2be
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class TikaReceiver extends Receiver<String> {
+
+ private static final long serialVersionUID = 1L;
+ private TikaContent tikaContent;
+
+ public TikaReceiver(TikaContent tikaContent) {
+ super(StorageLevel.MEMORY_ONLY());
+ this.tikaContent = tikaContent;
+ }
+ @Override
+ public void onStart() {
+ super.store(tikaContent.getContent());
+ }
+ @Override
+ public void onStop() {
+ // complete
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
new file mode 100644
index 0000000..264b4f6
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
@@ -0,0 +1,49 @@
+<html xmlns="http://www.w3.org/1999/xhtml">
+<head>
+ <title>Form Upload</title>
+ <STYLE TYPE="text/css">
+ <!--
+ input {font-family:verdana, arial, helvetica, sans-serif;font-size:20px;line-height:40px;}
+ H1 { text-align: center}
+ div.padded {
+ padding-left: 5em;
+ }
+ -->
+</STYLE>
+</head>
+<body>
+<H1>Form Upload</H1>
+<br/>
+ <form action="http://localhost:9000/spark/multipart"
+ enctype="multipart/form-data"
+ method="POST">
+ <div class="padded">
+ <table>
+ <tr>
+ <td><big><big><big>File:</big></big></big></td>
+ <td>
+ <input id="content" size="50" name="file" type="file"/>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2"> </td>
+ </tr>
+ <tr>
+ <td>
+
+ </td>
+ </tr>
+ </table>
+ </div>
+ <table align="center">
+ <tr>
+ <td colspan="2">
+ <input type="submit" value=" Upload "/>
+ </td>
+ </tr>
+ </table>
+ </form>
+
+
+</body>
+</html>