You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/09/10 14:41:27 UTC

[19/37] cxf git commit: [CXF-6618] Initial code for processing binary data with Tika and pushing them to Spark

[CXF-6618] Initial code for processing binary data with Tika and pushing them to Spark


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/4427f779
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/4427f779
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/4427f779

Branch: refs/heads/master-jaxrs-2.1
Commit: 4427f7790e522144baa325e5b5c051885e9dc706
Parents: d68d8d8
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Thu Sep 8 13:36:36 2016 +0100
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Thu Sep 8 13:36:36 2016 +0100

----------------------------------------------------------------------
 .../release/samples/jax_rs/spark/README.txt     | 18 +++--
 .../main/release/samples/jax_rs/spark/pom.xml   | 15 ++++
 .../jaxrs/server/AdvancedStreamingService.java  | 80 --------------------
 .../src/main/java/demo/jaxrs/server/Server.java |  2 +-
 .../demo/jaxrs/server/SparkStreamingOutput.java |  4 +-
 .../demo/jaxrs/server/StreamingService.java     | 37 ++++++++-
 .../java/demo/jaxrs/server/TikaReceiver.java    | 43 +++++++++++
 .../spark/src/main/resources/multipartForm.html | 49 ++++++++++++
 8 files changed, 154 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/README.txt
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt
index e6a218d..8a7b292 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/README.txt
+++ b/distribution/src/main/release/samples/jax_rs/spark/README.txt
@@ -1,5 +1,5 @@
-JAX-RS Basic Spark Demo 
-=======================
+JAX-RS Spark Streaming Demo 
+===========================
 
 This demo demonstrates how to connect HTTP and Spark streams with JAX-RS
 
@@ -9,15 +9,17 @@ mvn exec:java
 
 Next do: 
 
+1. Simple text processing:
+
 curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/stream
 
-Limitations: 
+2. PDF processing:
 
-This demo accepts one request at a time due to Spark restricting that only a single streaming context can be active
-in JVM at a given moment of time. This is the error which will be logged if you try to access the demo server concurrently:
+Open multipart.html located in src/main/resources, locate any PDF file available on the local disk and upload.
 
+Note Spark restricts that only a single streaming context can be active in JVM at a given moment of time. 
+This is the error which will be logged if you try to access the demo server concurrently:
 "org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
- To ignore this error, set spark.driver.allowMultipleContexts = true".
- 
- More flexible demo server will be added in due time. 
+
+To ignore this error, set spark.driver.allowMultipleContexts = true".
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
index b2541a0..10a00da 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
@@ -44,6 +44,11 @@
             <version>3.2.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-search</artifactId>
+            <version>3.2.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.10</artifactId>
             <version>2.0.0-preview</version>
@@ -59,6 +64,16 @@
             <artifactId>netty</artifactId>
             <version>3.7.0.Final</version>
         </dependency> 
+        <dependency>
+            <groupId>org.apache.tika</groupId>
+            <artifactId>tika-core</artifactId>
+            <version>2.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.tika</groupId>
+            <artifactId>tika-parser-pdf-module</artifactId>
+            <version>2.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <repositories>

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
deleted file mode 100644
index 1971fd9..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/AdvancedStreamingService.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server;
-
-import java.io.InputStream;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.StreamingOutput;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.StreamingContext;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.dstream.ReceiverInputDStream;
-import org.apache.spark.streaming.receiver.Receiver;
-
-import scala.reflect.ClassTag;
-
-// INCOMPLETE
-
-@Path("/")
-public class AdvancedStreamingService {
-    private JavaStreamingContext jssc;
-    public AdvancedStreamingService(SparkConf sparkConf) {
-        this.jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
-        new MyReceiverInputDStream(jssc.ssc(), 
-                                   scala.reflect.ClassTag$.MODULE$.apply(String.class));
-    }
-    
-    @POST
-    @Path("/stream")
-    @Consumes("text/plain")
-    @Produces("text/plain")
-    public StreamingOutput getStream(InputStream is) {
-        
-        return null;
-    }
-
-     
-    public static class MyReceiverInputDStream extends ReceiverInputDStream<String> {
-
-        public MyReceiverInputDStream(StreamingContext ssc, ClassTag<String> evidence) {
-            super(ssc, evidence);
-        }
-        public void putInputStream(InputStream is) {
-            
-        }
-        @Override
-        public Receiver<String> getReceiver() {
-            // A receiver can be created per every String the input stream
-            return new InputStreamReceiver(getInputStream());
-        }
-        public InputStream getInputStream() {
-            // TODO Auto-generated method stub
-            return null;
-        }    
-    }
-
-
-    
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
index 50f915a..8a1092f 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/Server.java
@@ -30,7 +30,7 @@ public class Server {
         sf.setResourceClasses(StreamingService.class);
         sf.setResourceProvider(StreamingService.class, 
             new SingletonResourceProvider(new StreamingService()));
-        sf.setAddress("http://localhost:9000/");
+        sf.setAddress("http://localhost:9000/spark");
 
         sf.create();
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
index 43166fe..7324806 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkStreamingOutput.java
@@ -35,16 +35,18 @@ public class SparkStreamingOutput implements StreamingOutput {
     
     private JavaStreamingContext jssc;
     private volatile boolean sparkBatchCompleted;
+    private volatile boolean outputWriteDone;
     public SparkStreamingOutput(JavaStreamingContext jssc) {
         this.jssc = jssc;
     }
 
     @Override
     public void write(final OutputStream output) throws IOException, WebApplicationException {
-        while (!sparkBatchCompleted || !responseQueue.isEmpty()) {
+        while (!sparkBatchCompleted || !outputWriteDone || !responseQueue.isEmpty()) {
             try {
                 String responseEntry = responseQueue.poll(1, TimeUnit.MILLISECONDS);
                 if (responseEntry != null) {
+                    outputWriteDone = true;
                     output.write(StringUtils.toBytesUTF8(responseEntry));
                     output.flush();
                 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
index c9cc033..5e059fc 100644
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/StreamingService.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -35,6 +36,11 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 
+import org.apache.cxf.common.util.Base64Utility;
+import org.apache.cxf.jaxrs.ext.multipart.Attachment;
+import org.apache.cxf.jaxrs.ext.multipart.Multipart;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -47,6 +53,8 @@ import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.tika.parser.pdf.PDFParser;
 
 import scala.Tuple2;
 
@@ -59,20 +67,35 @@ public class StreamingService {
     }
     
     @POST
+    @Path("/multipart")
+    @Consumes("multipart/form-data")
+    @Produces("text/plain")
+    public void processMultipartStream(@Suspended AsyncResponse async, 
+                                       @Multipart("file") Attachment att) {
+        TikaContentExtractor tika = new TikaContentExtractor(new PDFParser());
+        TikaContent tikaContent = tika.extract(att.getObject(InputStream.class));
+        processStream(async, new TikaReceiver(tikaContent));
+    }
+    
+    @POST
     @Path("/stream")
     @Consumes("text/plain")
     @Produces("text/plain")
-    public void getStream(@Suspended AsyncResponse async, InputStream is) {
+    public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
+        processStream(async, new InputStreamReceiver(is));
+    }
+
+    private void processStream(AsyncResponse async, Receiver<String> receiver) {
         try {
-            SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("JAX-RS Spark Connect");
+            SparkConf sparkConf = new SparkConf().setMaster("local[*]")
+                .setAppName("JAX-RS Spark Connect " + getRandomId());
             JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); 
             
             SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
             SparkStreamingListener sparkListener =  new SparkStreamingListener(streamOut);
             jssc.addStreamingListener(sparkListener);
             
-            JavaReceiverInputDStream<String> receiverStream = 
-                jssc.receiverStream(new InputStreamReceiver(is));
+            JavaReceiverInputDStream<String> receiverStream = jssc.receiverStream(receiver);
             JavaPairDStream<String, Integer> wordCounts = createOutputDStream(receiverStream);
             wordCounts.foreachRDD(new OutputFunction(streamOut));
             jssc.start();
@@ -87,6 +110,12 @@ public class StreamingService {
             }
         }
     }
+    
+    private static String getRandomId() {
+        byte[] bytes = new byte[10];
+        new Random().nextBytes(bytes);
+        return Base64Utility.encode(bytes);
+    }
 
     @SuppressWarnings("serial")
     private static JavaPairDStream<String, Integer> createOutputDStream(

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
new file mode 100644
index 0000000..daab2be
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/TikaReceiver.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package demo.jaxrs.server;
+
+import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+
+public class TikaReceiver extends Receiver<String> {
+
+    private static final long serialVersionUID = 1L;
+    private TikaContent tikaContent;
+    
+    public TikaReceiver(TikaContent tikaContent) {
+        super(StorageLevel.MEMORY_ONLY());
+        this.tikaContent = tikaContent;
+    }
+    @Override
+    public void onStart() {
+        super.store(tikaContent.getContent());
+    }
+    @Override
+    public void onStop() {
+        // complete
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/4427f779/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
----------------------------------------------------------------------
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
new file mode 100644
index 0000000..264b4f6
--- /dev/null
+++ b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
@@ -0,0 +1,49 @@
+<html xmlns="http://www.w3.org/1999/xhtml">
+<head>
+    <title>Form Upload</title>
+    <STYLE TYPE="text/css">
+	<!--
+	  input {font-family:verdana, arial, helvetica, sans-serif;font-size:20px;line-height:40px;}
+	  H1 { text-align: center}
+	  div.padded {  
+         padding-left: 5em;  
+      }   
+	-->
+</STYLE>
+</head>
+<body>
+<H1>Form Upload</H1>
+<br/>
+     <form action="http://localhost:9000/spark/multipart"
+           enctype="multipart/form-data" 
+           method="POST">
+       <div class="padded">  
+       <table>    
+        <tr>
+            <td><big><big><big>File:</big></big></big></td>
+            <td>
+               <input id="content" size="50" name="file" type="file"/>
+            </td>
+        </tr>
+        <tr>
+            <td colspan="2">&nbsp;</td>
+        </tr>
+        <tr>
+            <td>
+              &nbsp;
+            </td>
+        </tr>
+        </table>
+        </div>
+        <table align="center">
+        <tr>
+            <td colspan="2">
+                <input type="submit" value="    Upload    "/>
+            </td>
+        </tr>
+        </table>
+  </form>
+ 
+  
+</body>
+</html>