You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by co...@apache.org on 2021/09/30 16:43:39 UTC
[cxf] branch master updated: CXF-8602 - Upgrade to Tika 2.x (#858)
This is an automated email from the ASF dual-hosted git repository.
coheigea pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 9ed98f3 CXF-8602 - Upgrade to Tika 2.x (#858)
9ed98f3 is described below
commit 9ed98f36a381b8f881c2d63a5846572ba83a4ed9
Author: Colm O hEigeartaigh <co...@users.noreply.github.com>
AuthorDate: Thu Sep 30 17:43:10 2021 +0100
CXF-8602 - Upgrade to Tika 2.x (#858)
---
.../src/main/release/samples/jax_rs/search/pom.xml | 12 +-
.../main/release/samples/jax_rs/spark/README.txt | 54 ------
.../src/main/release/samples/jax_rs/spark/pom.xml | 96 ----------
.../main/java/demo/jaxrs/server/SparkUtils.java | 113 -----------
.../main/java/demo/jaxrs/server/simple/Server.java | 50 -----
.../java/demo/jaxrs/server/simple/SparkJob.java | 36 ----
.../server/simple/SparkStreamingListener.java | 93 ---------
.../jaxrs/server/simple/SparkStreamingOutput.java | 73 -------
.../demo/jaxrs/server/simple/StreamingService.java | 212 ---------------------
.../jaxrs/server/simple/StringListReceiver.java | 44 -----
.../main/java/demo/jaxrs/server/socket/Server.java | 128 -------------
.../java/demo/jaxrs/server/socket/SparkJob.java | 55 ------
.../demo/jaxrs/server/socket/SparkResultJob.java | 53 ------
.../jaxrs/server/socket/SparkStreamingOutput.java | 62 ------
.../demo/jaxrs/server/socket/StreamingService.java | 115 -----------
.../spark/src/main/resources/multipartForm.html | 49 -----
distribution/src/main/release/samples/pom.xml | 3 +-
parent/pom.xml | 13 +-
rt/rs/extensions/search/pom.xml | 31 +--
.../ext/search/tika/TikaContentExtractorTest.java | 4 +-
.../tika/TikaLuceneContentExtractorTest.java | 24 +--
systests/jaxrs/pom.xml | 36 ++--
.../cxf/systest/jaxrs/extraction/BookCatalog.java | 6 +-
.../extraction/JAXRSClientServerTikaTest.java | 4 +-
24 files changed, 49 insertions(+), 1317 deletions(-)
diff --git a/distribution/src/main/release/samples/jax_rs/search/pom.xml b/distribution/src/main/release/samples/jax_rs/search/pom.xml
index 3bd92c6..b46bbbe 100644
--- a/distribution/src/main/release/samples/jax_rs/search/pom.xml
+++ b/distribution/src/main/release/samples/jax_rs/search/pom.xml
@@ -132,7 +132,17 @@
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
- <artifactId>tika-parsers</artifactId>
+ <artifactId>tika-core</artifactId>
+ <version>${cxf.tika.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parser-pdf-module</artifactId>
+ <version>${cxf.tika.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parser-miscoffice-module</artifactId>
<version>${cxf.tika.version}</version>
</dependency>
<dependency>
diff --git a/distribution/src/main/release/samples/jax_rs/spark/README.txt b/distribution/src/main/release/samples/jax_rs/spark/README.txt
deleted file mode 100644
index c143a85..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/README.txt
+++ /dev/null
@@ -1,54 +0,0 @@
-JAX-RS Spark Streaming Demo
-===========================
-
-This demo demonstrates how to connect HTTP and Spark streams with JAX-RS.
-The demo accept simple Strings or binary attachments which are processed with Tika.
-In both cases a list of strings is pushed into a Spark Streaming Pipeline with the
-pipeline output response streamed back to the HTTP client.
-
-Build the demo with "mvn install" and start it with
-
-mvn exec:java -Dexec.mainClass=demo.jaxrs.server.simple.Server
-(uses Spark Receiver initialized with a list of strings)
-
-or
-
-mvn exec:java -Dexec.mainClass=demo.jaxrs.server.simple.Server -DexecArgs=-receiverType=queue
-(uses Spark Queue Receiver initialized with a parallelized data set)
-
-In both cases a new streaming context is created on every request.
-
-You can also try:
-
-mvn exec:java -Dexec.mainClass=demo.jaxrs.server.socket.Server
-
-(Uses a client socket receiver - JAX-RS server will push a list of strings to it
-and will write down the response data it gets back)
-
-
-Next do:
-
-1. Simple text processing:
-
-curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/spark/stream
-
-2. Simple one way text processing:
-
-curl -X POST -H "Accept: text/plain" -H "Content-Type: text/plain" -d "Hello Spark" http://localhost:9000/spark/streamOneWay
-
-3. PDF/ODT/ODP processing:
-
-Open multipart.html located in src/main/resources, locate any PDF or OpenOffice text or presentation file available
-on the local disk and upload.
-
-Note Spark restricts that only a single streaming context can be active in JVM at a given moment of time.
-demo.jaxrs.server.simple.Server creates a new context per every request so this is the error which will be logged
-if you try to access this demo server concurrently:
-
-"org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
-To ignore this error, set spark.driver.allowMultipleContexts = true".
-
-However demo.jaxrs.server.socket.Server creates only a single context and its JAX-RS frontend can process multiple requests concurrently
-without having to set "spark.driver.allowMultipleContexts = true".
-
-
diff --git a/distribution/src/main/release/samples/jax_rs/spark/pom.xml b/distribution/src/main/release/samples/jax_rs/spark/pom.xml
deleted file mode 100644
index 4bab462..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/pom.xml
+++ /dev/null
@@ -1,96 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>jax_rs_spark</artifactId>
- <name>JAX-RS Spark Streaming Demo</name>
- <description>JAX-RS Spark Streaming Demo</description>
- <parent>
- <groupId>org.apache.cxf.samples</groupId>
- <artifactId>cxf-samples</artifactId>
- <version>3.5.0-SNAPSHOT</version>
- <relativePath>../..</relativePath>
- </parent>
- <properties>
- <cxf.version>${project.version}</cxf.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.cxf</groupId>
- <artifactId>cxf-rt-transports-http-jetty</artifactId>
- <version>3.5.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.cxf</groupId>
- <artifactId>cxf-rt-frontend-jaxrs</artifactId>
- <version>3.5.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.cxf</groupId>
- <artifactId>cxf-rt-rs-extension-search</artifactId>
- <version>3.5.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.0.0</version>
- <exclusions>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- <version>2.9.9</version>
- </dependency>
- <dependency>
- <groupId>org.apache.tika</groupId>
- <artifactId>tika-parsers</artifactId>
- <version>${cxf.tika.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>exec</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <executable>java</executable>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
deleted file mode 100644
index e073f22..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/SparkUtils.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package demo.jaxrs.server;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.StringReader;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
-
-import javax.ws.rs.WebApplicationException;
-
-import org.apache.cxf.common.util.Base64Utility;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-
-import scala.Tuple2;
-
-
-public final class SparkUtils {
-
- private SparkUtils() {
- }
-
- public static JavaPairDStream<String, Integer> createOutputDStream(
- JavaDStream<String> receiverStream, boolean withId) {
- final JavaDStream<String> words =
- receiverStream.flatMap(x -> withId ? splitInputStringWithId(x) : splitInputString(x));
-
- final JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> {
- return new Tuple2<String, Integer>(s, 1);
- });
- return pairs.reduceByKey((i1, i2) -> {
- return i1 + i2;
- });
- }
- public static Iterator<String> splitInputString(String x) {
- List<String> list = new LinkedList<>();
- for (String s : Arrays.asList(x.split(" "))) {
- s = s.trim();
- if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || s.endsWith(".")) {
- s = s.substring(0, s.length() - 1);
- }
- if (!s.isEmpty()) {
- list.add(s);
- }
- }
- return list.iterator();
- }
- public static Iterator<String> splitInputStringWithId(String x) {
- int index = x.indexOf(':');
- String jobId = x.substring(0, index);
- x = x.substring(index + 1);
-
- List<String> list = new LinkedList<>();
- for (String s : Arrays.asList(x.split(" "))) {
- s = s.trim();
- if (s.endsWith(":") || s.endsWith(",") || s.endsWith(";") || s.endsWith(".")) {
- s = s.substring(0, s.length() - 1);
- }
- if (!s.isEmpty()) {
- list.add(jobId + ":" + s);
- }
- }
- return list.iterator();
- }
- public static String getRandomId() {
- byte[] bytes = new byte[10];
- new Random().nextBytes(bytes);
- return Base64Utility.encode(bytes);
- }
- public static List<String> getStringsFromInputStream(InputStream is) {
- return getStringsFromReader(new BufferedReader(new InputStreamReader(is)));
- }
- public static List<String> getStringsFromString(String s) {
- return getStringsFromReader(new BufferedReader(new StringReader(s)));
- }
- public static List<String> getStringsFromReader(BufferedReader reader) {
-
- List<String> inputStrings = new LinkedList<>();
- String userInput = null;
- try {
- while ((userInput = reader.readLine()) != null) {
- inputStrings.add(userInput);
- }
- } catch (IOException ex) {
- throw new WebApplicationException(ex);
- }
- return inputStrings;
- }
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java
deleted file mode 100644
index e8d8519..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/Server.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package demo.jaxrs.server.simple;
-
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-
-
-public class Server {
-
- protected Server(String[] args) throws Exception {
- JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
- sf.setResourceClasses(StreamingService.class);
-
- String receiverType = args.length == 1 && "-receiverType=queue".equals(args[0])
- ? "queue" : "string";
- sf.setResourceProvider(StreamingService.class,
- new SingletonResourceProvider(new StreamingService(receiverType)));
- sf.setAddress("http://localhost:9000/spark");
-
- sf.create();
- }
-
- public static void main(String[] args) throws Exception {
- new Server(args);
- System.out.println("Server ready...");
- Thread.sleep(60 * 60 * 1000);
- System.out.println("Server exiting");
- System.exit(0);
- }
-
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java
deleted file mode 100644
index 84a00c7..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkJob.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.simple;
-
-import javax.ws.rs.container.AsyncResponse;
-
-public class SparkJob implements Runnable {
- private AsyncResponse ac;
- private SparkStreamingListener sparkListener;
- public SparkJob(AsyncResponse ac, SparkStreamingListener sparkListener) {
- this.ac = ac;
- this.sparkListener = sparkListener;
- }
- @Override
- public void run() {
- sparkListener.waitForBatchStarted();
- ac.resume(sparkListener.getStreamOut());
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java
deleted file mode 100644
index 0d6a200..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingListener.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.simple;
-
-import org.apache.spark.streaming.scheduler.StreamingListener;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted;
-import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationCompleted;
-import org.apache.spark.streaming.scheduler.StreamingListenerOutputOperationStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted;
-import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped;
-
-public class SparkStreamingListener implements StreamingListener {
- private SparkStreamingOutput streamOutput;
- private boolean batchStarted;
- private long batchStartAt;
-
- public SparkStreamingListener(SparkStreamingOutput streamOutput) {
- this.streamOutput = streamOutput;
- }
-
- @Override
- public void onBatchCompleted(StreamingListenerBatchCompleted event) {
- System.out.println("Batch processing time in millisecs: " + (System.currentTimeMillis() - batchStartAt));
-
- streamOutput.setSparkBatchCompleted();
- }
-
- @Override
- public synchronized void onBatchStarted(StreamingListenerBatchStarted event) {
- batchStarted = true;
- batchStartAt = System.currentTimeMillis();
- notify();
- }
-
- @Override
- public void onBatchSubmitted(StreamingListenerBatchSubmitted event) {
- }
-
- @Override
- public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted event) {
- }
-
- @Override
- public void onOutputOperationStarted(StreamingListenerOutputOperationStarted event) {
- }
-
- @Override
- public void onReceiverError(StreamingListenerReceiverError event) {
- }
-
- @Override
- public void onReceiverStarted(StreamingListenerReceiverStarted event) {
- }
-
- @Override
- public void onReceiverStopped(StreamingListenerReceiverStopped arg0) {
- }
-
- public SparkStreamingOutput getStreamOut() {
- return streamOutput;
- }
-
- public synchronized void waitForBatchStarted() {
- while (!batchStarted) {
- try {
- this.wait();
- } catch (InterruptedException ex) {
- // continue
- }
- }
-
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java
deleted file mode 100644
index 8442bc3..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/SparkStreamingOutput.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.simple;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.StreamingOutput;
-
-import org.apache.cxf.common.util.StringUtils;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class SparkStreamingOutput implements StreamingOutput {
- private BlockingQueue<String> responseQueue = new LinkedBlockingQueue<>();
-
- private JavaStreamingContext jssc;
- private volatile boolean sparkBatchCompleted;
- private volatile boolean outputWriteDone;
- private long startAt;
- public SparkStreamingOutput(JavaStreamingContext jssc) {
- this.jssc = jssc;
- this.startAt = System.currentTimeMillis();
- }
-
- @Override
- public void write(final OutputStream output) throws IOException, WebApplicationException {
- while (!sparkBatchCompleted || !outputWriteDone || !responseQueue.isEmpty()) {
- try {
- String responseEntry = responseQueue.poll(1, TimeUnit.MILLISECONDS);
- if (responseEntry != null) {
- outputWriteDone = true;
- output.write(StringUtils.toBytesUTF8(responseEntry));
- output.flush();
- }
- } catch (InterruptedException e) {
- // continue;
- }
- }
-
- jssc.stop(false);
- jssc.close();
- System.out.println("Total processing time in millisecs: " + (System.currentTimeMillis() - startAt));
- }
-
-
- public void setSparkBatchCompleted() {
- this.sparkBatchCompleted = true;
- }
-
- public void addResponseEntry(String value) {
- responseQueue.add(value);
- }
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
deleted file mode 100644
index f7cd34a..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StreamingService.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.simple;
-
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.jaxrs.ext.Oneway;
-import org.apache.cxf.jaxrs.ext.multipart.Attachment;
-import org.apache.cxf.jaxrs.ext.multipart.Multipart;
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import demo.jaxrs.server.SparkUtils;
-
-
-@Path("/")
-public class StreamingService {
- private static final Map<String, MediaType> MEDIA_TYPE_TABLE;
- static {
- MEDIA_TYPE_TABLE = new HashMap<>();
- MEDIA_TYPE_TABLE.put("pdf", MediaType.valueOf("application/pdf"));
- MEDIA_TYPE_TABLE.put("odt", MediaType.valueOf("application/vnd.oasis.opendocument.text"));
- MEDIA_TYPE_TABLE.put("odp", MediaType.valueOf("application/vnd.oasis.opendocument.presentation"));
- }
- private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(10));
-
- private String receiverType;
-
- public StreamingService(String receiverType) {
- this.receiverType = receiverType;
- }
-
- @POST
- @Path("/multipart")
- @Consumes("multipart/form-data")
- @Produces("text/plain")
- public void processMultipartStream(@Suspended AsyncResponse async,
- @Multipart("file") Attachment att) {
- MediaType mediaType = att.getContentType();
- if (mediaType == null) {
- String fileName = att.getContentDisposition().getFilename();
- if (fileName != null) {
- int extDot = fileName.lastIndexOf('.');
- if (extDot > 0) {
- mediaType = MEDIA_TYPE_TABLE.get(fileName.substring(extDot + 1));
- }
- }
- }
-
- TikaContentExtractor tika = new TikaContentExtractor();
- TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
- mediaType);
- processStream(async, SparkUtils.getStringsFromString(tikaContent.getContent()));
- }
-
- @POST
- @Path("/stream")
- @Consumes("text/plain")
- @Produces("text/plain")
- public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
- processStream(async, SparkUtils.getStringsFromInputStream(is));
- }
- @POST
- @Path("/streamOneWay")
- @Consumes("text/plain")
- @Oneway
- public void processSimpleStreamOneWay(InputStream is) {
- processStreamOneWay(SparkUtils.getStringsFromInputStream(is));
- }
-
- private void processStream(AsyncResponse async, List<String> inputStrings) {
- try {
- SparkConf sparkConf = new SparkConf().setMaster("local[*]")
- .setAppName("JAX-RS Spark Connect " + SparkUtils.getRandomId());
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
-
- SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
- SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut);
- jssc.addStreamingListener(sparkListener);
-
- JavaDStream<String> receiverStream = null;
- if ("queue".equals(receiverType)) {
- Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
- for (int i = 0; i < 30; i++) {
- rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
- }
- receiverStream = jssc.queueStream(rddQueue);
- } else {
- receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings));
- }
-
- JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream, false);
- wordCounts.foreachRDD(new OutputFunction(streamOut));
- jssc.start();
-
- executor.execute(new SparkJob(async, sparkListener));
- } catch (Exception ex) {
- // the compiler does not allow to catch SparkException directly
- if (ex instanceof SparkException) {
- async.cancel(60);
- } else {
- async.resume(new WebApplicationException(ex));
- }
- }
- }
-
- private void processStreamOneWay(List<String> inputStrings) {
- try {
- SparkConf sparkConf = new SparkConf().setMaster("local[*]")
- .setAppName("JAX-RS Spark Connect OneWay " + SparkUtils.getRandomId());
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
-
- JavaDStream<String> receiverStream = null;
- if ("queue".equals(receiverType)) {
- Queue<JavaRDD<String>> rddQueue = new LinkedList<>();
- for (int i = 0; i < 30; i++) {
- rddQueue.add(jssc.sparkContext().parallelize(inputStrings));
- }
- receiverStream = jssc.queueStream(rddQueue);
- } else {
- receiverStream = jssc.receiverStream(new StringListReceiver(inputStrings));
- }
-
- JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream, false);
- wordCounts.foreachRDD(new PrintOutputFunction(jssc));
- jssc.start();
- } catch (Exception ex) {
- // ignore
- }
- }
-
-
- private static class OutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
- private static final long serialVersionUID = 1L;
- private SparkStreamingOutput streamOut;
- OutputFunction(SparkStreamingOutput streamOut) {
- this.streamOut = streamOut;
- }
- @Override
- public void call(JavaPairRDD<String, Integer> rdd) {
- for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
- String value = entry.getKey() + " : " + entry.getValue() + "\n";
- streamOut.addResponseEntry(value);
- }
- }
-
- }
- private static class PrintOutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
- private static final long serialVersionUID = 1L;
- private JavaStreamingContext jssc;
- PrintOutputFunction(JavaStreamingContext jssc) {
- this.jssc = jssc;
- }
- @Override
- public void call(JavaPairRDD<String, Integer> rdd) {
- if (!rdd.collectAsMap().isEmpty()) {
- for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
- String value = entry.getKey() + " : " + entry.getValue();
- System.out.println(value);
- }
- jssc.stop(false);
- jssc.close();
- }
- }
-
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java
deleted file mode 100644
index 76432cc..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/simple/StringListReceiver.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.simple;
-
-import java.util.List;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-public class StringListReceiver extends Receiver<String> {
-
- private static final long serialVersionUID = 1L;
- private List<String> inputStrings;
-
- public StringListReceiver(List<String> inputStrings) {
- super(StorageLevel.MEMORY_ONLY());
- this.inputStrings = inputStrings;
- }
- @Override
- public void onStart() {
- super.store(inputStrings.iterator());
- }
- @Override
- public void onStop() {
- // complete
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
deleted file mode 100644
index 6beac06..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/Server.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package demo.jaxrs.server.socket;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.PrintStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Map;
-
-import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
-import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.StorageLevels;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-import demo.jaxrs.server.SparkUtils;
-import demo.jaxrs.server.simple.SparkStreamingListener;
-import demo.jaxrs.server.simple.SparkStreamingOutput;
-
-
-public class Server {
-
- protected Server(String[] args) throws Exception {
-
- ServerSocket sparkServerSocket = new ServerSocket(9999);
- ServerSocket jaxrsResponseServerSocket = new ServerSocket(10000);
- Socket jaxrsResponseClientSocket = new Socket("localhost", 10000);
-
-
- SparkConf sparkConf = new SparkConf().setMaster("local[*]")
- .setAppName("JAX-RS Spark Socket Connect");
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
-
- SparkStreamingOutput streamOut = new SparkStreamingOutput(jssc);
- SparkStreamingListener sparkListener = new SparkStreamingListener(streamOut);
- jssc.addStreamingListener(sparkListener);
-
- JavaDStream<String> receiverStream = jssc.socketTextStream(
- "localhost", 9999, StorageLevels.MEMORY_ONLY);
-
- JavaPairDStream<String, Integer> wordCounts = SparkUtils.createOutputDStream(receiverStream, true);
- PrintStream sparkResponseOutputStream = new PrintStream(jaxrsResponseClientSocket.getOutputStream(), true);
- wordCounts.foreachRDD(new SocketOutputFunction(sparkResponseOutputStream));
-
- jssc.start();
-
- Socket receiverClientSocket = sparkServerSocket.accept();
- PrintStream sparkOutputStream = new PrintStream(receiverClientSocket.getOutputStream(), true);
- BufferedReader sparkInputStream =
- new BufferedReader(new InputStreamReader(jaxrsResponseServerSocket.accept().getInputStream()));
-
-
- JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
-
- sf.setResourceClasses(StreamingService.class);
- sf.setResourceProvider(StreamingService.class,
- new SingletonResourceProvider(new StreamingService(sparkInputStream,
- sparkOutputStream)));
- sf.setAddress("http://localhost:9000/spark");
- sf.create();
-
- jssc.awaitTermination();
- sparkServerSocket.close();
- jaxrsResponseServerSocket.close();
- jaxrsResponseClientSocket.close();
-
- }
-
- public static void main(String[] args) throws Exception {
- new Server(args);
- System.out.println("Server ready...");
- Thread.sleep(60 * 60 * 1000);
- System.out.println("Server exiting");
- System.exit(0);
- }
-
- private static class SocketOutputFunction implements VoidFunction<JavaPairRDD<String, Integer>> {
- private static final long serialVersionUID = 1L;
- private PrintStream streamOut;
- SocketOutputFunction(PrintStream streamOut) {
- this.streamOut = streamOut;
- }
- @Override
- public void call(JavaPairRDD<String, Integer> rdd) {
- if (!rdd.collectAsMap().isEmpty()) {
- String jobId = null;
- PrintStream printStream = null;
- for (Map.Entry<String, Integer> entry : rdd.collectAsMap().entrySet()) {
- String value = entry.getKey() + " : " + entry.getValue();
- if (jobId == null) {
- int index = value.indexOf(':');
- jobId = value.substring(0, index);
- printStream = "oneway".equals(jobId) ? System.out : streamOut;
-
- }
- printStream.println(value);
- }
- printStream.println(jobId + ":" + "<batchEnd>");
- }
- }
-
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
deleted file mode 100644
index f07c1e6..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkJob.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.socket;
-
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.ws.rs.container.AsyncResponse;
-
-import demo.jaxrs.server.SparkUtils;
-
-public class SparkJob implements Runnable {
- private AsyncResponse ac;
- private Map<String, BlockingQueue<String>> sparkResponses;
- private PrintStream sparkOutputStream;
- private List<String> inputStrings;
- public SparkJob(AsyncResponse ac, Map<String, BlockingQueue<String>> sparkResponses,
- PrintStream sparkOutputStream, List<String> inputStrings) {
- this.ac = ac;
- this.inputStrings = inputStrings;
- this.sparkResponses = sparkResponses;
- this.sparkOutputStream = sparkOutputStream;
- }
- @Override
- public void run() {
- String jobId = SparkUtils.getRandomId();
- BlockingQueue<String> queue = new LinkedBlockingQueue<>();
- sparkResponses.put(jobId, queue);
-
- for (String s : inputStrings) {
- sparkOutputStream.println(jobId + ":" + s);
- }
- ac.resume(new SparkStreamingOutput(sparkResponses, jobId, queue));
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java
deleted file mode 100644
index d774edf..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkResultJob.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.socket;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-public class SparkResultJob implements Runnable {
-
- private Map<String, BlockingQueue<String>> sparkResponses;
- private BufferedReader sparkInputStream;
- public SparkResultJob(Map<String, BlockingQueue<String>> sparkResponses,
- BufferedReader sparkInputStream) {
- this.sparkResponses = sparkResponses;
- this.sparkInputStream = sparkInputStream;
- }
-
-
- @Override
- public void run() {
- try {
- String s = null;
- while ((s = sparkInputStream.readLine()) != null) {
- int index = s.indexOf(':');
- String jobId = s.substring(0, index);
- String value = s.substring(index + 1);
- sparkResponses.get(jobId).offer(value);
- }
- } catch (IOException ex) {
- // ignore
- }
-
- }
-
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
deleted file mode 100644
index 7972843..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/SparkStreamingOutput.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.socket;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.StreamingOutput;
-
-public class SparkStreamingOutput implements StreamingOutput {
- private Map<String, BlockingQueue<String>> sparkResponses;
- private String jobId;
- private BlockingQueue<String> queue;
- public SparkStreamingOutput(Map<String, BlockingQueue<String>> sparkResponses, String jobId,
- BlockingQueue<String> queue) {
- this.sparkResponses = sparkResponses;
- this.jobId = jobId;
- this.queue = queue;
- }
-
- @Override
- public void write(final OutputStream output) throws IOException, WebApplicationException {
- PrintStream out = new PrintStream(output, true);
- try {
- while (true) {
- String responseEntry = queue.poll(1, TimeUnit.MILLISECONDS);
- if (responseEntry != null) {
- if ("<batchEnd>".equals(responseEntry)) {
- sparkResponses.remove(jobId);
- break;
- } else {
- out.println(responseEntry);
- }
- }
- }
- } catch (InterruptedException ex) {
- // ignore
- }
-
- }
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java b/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
deleted file mode 100644
index cb6980d..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/java/demo/jaxrs/server/socket/StreamingService.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package demo.jaxrs.server.socket;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.container.Suspended;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.jaxrs.ext.Oneway;
-import org.apache.cxf.jaxrs.ext.multipart.Attachment;
-import org.apache.cxf.jaxrs.ext.multipart.Multipart;
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor;
-import org.apache.cxf.jaxrs.ext.search.tika.TikaContentExtractor.TikaContent;
-
-import demo.jaxrs.server.SparkUtils;
-
-
-@Path("/")
-public class StreamingService {
- private static final Map<String, MediaType> MEDIA_TYPE_TABLE;
- static {
- MEDIA_TYPE_TABLE = new HashMap<>();
- MEDIA_TYPE_TABLE.put("pdf", MediaType.valueOf("application/pdf"));
- MEDIA_TYPE_TABLE.put("odt", MediaType.valueOf("application/vnd.oasis.opendocument.text"));
- MEDIA_TYPE_TABLE.put("odp", MediaType.valueOf("application/vnd.oasis.opendocument.presentation"));
- }
- private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(10));
- private Map<String, BlockingQueue<String>> sparkResponses = new ConcurrentHashMap<>();
- private PrintStream sparkOutputStream;
-
- public StreamingService(BufferedReader sparkInputStream, PrintStream sparkOutputStream) {
- this.sparkOutputStream = sparkOutputStream;
- executor.execute(new SparkResultJob(sparkResponses, sparkInputStream));
- }
-
- @POST
- @Path("/multipart")
- @Consumes("multipart/form-data")
- @Produces("text/plain")
- public void processMultipartStream(@Suspended AsyncResponse async,
- @Multipart("file") Attachment att) {
- MediaType mediaType = att.getContentType();
- if (mediaType == null) {
- String fileName = att.getContentDisposition().getFilename();
- if (fileName != null) {
- int extDot = fileName.lastIndexOf('.');
- if (extDot > 0) {
- mediaType = MEDIA_TYPE_TABLE.get(fileName.substring(extDot + 1));
- }
- }
- }
-
- TikaContentExtractor tika = new TikaContentExtractor();
- TikaContent tikaContent = tika.extract(att.getObject(InputStream.class),
- mediaType);
- processStream(async, SparkUtils.getStringsFromString(tikaContent.getContent()));
- }
-
- @POST
- @Path("/stream")
- @Consumes("text/plain")
- @Produces("text/plain")
- public void processSimpleStream(@Suspended AsyncResponse async, InputStream is) {
- processStream(async, SparkUtils.getStringsFromInputStream(is));
- }
-
- private void processStream(AsyncResponse async, List<String> inputStrings) {
- executor.execute(
- new SparkJob(async, sparkResponses, sparkOutputStream, inputStrings));
- }
-
- @POST
- @Path("/streamOneWay")
- @Consumes("text/plain")
- @Oneway
- public void processSimpleStreamOneWay(InputStream is) {
- for (String s : SparkUtils.getStringsFromInputStream(is)) {
- sparkOutputStream.println("oneway:" + s);
- }
- }
-}
diff --git a/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html b/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
deleted file mode 100644
index 264b4f6..0000000
--- a/distribution/src/main/release/samples/jax_rs/spark/src/main/resources/multipartForm.html
+++ /dev/null
@@ -1,49 +0,0 @@
-<html xmlns="http://www.w3.org/1999/xhtml">
-<head>
- <title>Form Upload</title>
- <STYLE TYPE="text/css">
- <!--
- input {font-family:verdana, arial, helvetica, sans-serif;font-size:20px;line-height:40px;}
- H1 { text-align: center}
- div.padded {
- padding-left: 5em;
- }
- -->
-</STYLE>
-</head>
-<body>
-<H1>Form Upload</H1>
-<br/>
- <form action="http://localhost:9000/spark/multipart"
- enctype="multipart/form-data"
- method="POST">
- <div class="padded">
- <table>
- <tr>
- <td><big><big><big>File:</big></big></big></td>
- <td>
- <input id="content" size="50" name="file" type="file"/>
- </td>
- </tr>
- <tr>
- <td colspan="2"> </td>
- </tr>
- <tr>
- <td>
-
- </td>
- </tr>
- </table>
- </div>
- <table align="center">
- <tr>
- <td colspan="2">
- <input type="submit" value=" Upload "/>
- </td>
- </tr>
- </table>
- </form>
-
-
-</body>
-</html>
diff --git a/distribution/src/main/release/samples/pom.xml b/distribution/src/main/release/samples/pom.xml
index 36491d8..2f78219 100644
--- a/distribution/src/main/release/samples/pom.xml
+++ b/distribution/src/main/release/samples/pom.xml
@@ -35,7 +35,7 @@
<cxf.jetty9.version>9.4.43.v20210629</cxf.jetty9.version>
<cxf.httpcomponents.client.version>4.5.13</cxf.httpcomponents.client.version>
<cxf.swagger.ui.version>3.52.1</cxf.swagger.ui.version>
- <cxf.tika.version>1.27</cxf.tika.version>
+ <cxf.tika.version>2.1.0</cxf.tika.version>
<cxf.tomcat.version>9.0.53</cxf.tomcat.version>
<graalvm.version>21.1.0</graalvm.version>
</properties>
@@ -86,7 +86,6 @@
<module>jax_rs/minimal_osgi</module>
<module>jax_rs/odata</module>
<module>jax_rs/search</module>
- <module>jax_rs/spark</module>
<module>jax_rs/spring_boot</module>
<module>jax_rs/spring_boot_scan/application</module>
<module>jax_rs/spring_boot_scan/client</module>
diff --git a/parent/pom.xml b/parent/pom.xml
index 39b487f..515179b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -205,7 +205,7 @@
<cxf.swagger.v3.version>2.1.11</cxf.swagger.v3.version>
<cxf.swagger2.version>1.6.3</cxf.swagger2.version>
<cxf.swagger2.guava.version>27.0-jre</cxf.swagger2.guava.version>
- <cxf.tika.version>1.27</cxf.tika.version>
+ <cxf.tika.version>2.1.0</cxf.tika.version>
<cxf.tomcat.version>9.0.53</cxf.tomcat.version>
<cxf.tomitribe.http.signature.version>1.7</cxf.tomitribe.http.signature.version>
<cxf.undertow.osgi.version>[1.4,3.0)</cxf.undertow.osgi.version>
@@ -1881,17 +1881,6 @@
<version>${cxf.tika.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.tika</groupId>
- <artifactId>tika-parsers</artifactId>
- <version>${cxf.tika.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-jexl3</artifactId>
<version>${cxf.jexl.version}</version>
diff --git a/rt/rs/extensions/search/pom.xml b/rt/rs/extensions/search/pom.xml
index 40df271..5d19f13 100644
--- a/rt/rs/extensions/search/pom.xml
+++ b/rt/rs/extensions/search/pom.xml
@@ -84,39 +84,12 @@
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
<optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>javax.xml.bind</groupId>
- <artifactId>jaxb-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.activation</groupId>
- <artifactId>activation</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
- <artifactId>tika-parsers</artifactId>
+ <artifactId>tika-parser-pdf-module</artifactId>
+ <version>${cxf.tika.version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.poi</groupId>
- <artifactId>poi-ooxml</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.xml.bind</groupId>
- <artifactId>jaxb-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.activation</groupId>
- <artifactId>javax.activation-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.annotation</groupId>
- <artifactId>javax.annotation-api</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
diff --git a/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaContentExtractorTest.java b/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaContentExtractorTest.java
index 3ed4116..e53ebf1 100644
--- a/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaContentExtractorTest.java
+++ b/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaContentExtractorTest.java
@@ -46,7 +46,7 @@ public class TikaContentExtractorTest {
@Test
public void testExtractedTextContentMatchesSearchCriteria() throws Exception {
- SearchCondition<SearchBean> sc = parser.parse("Author==Bertrand*");
+ SearchCondition<SearchBean> sc = parser.parse("dc:creator==Bertrand*");
final SearchBean bean = extractor.extractMetadataToSearchBean(
getClass().getResourceAsStream("/files/testPDF.pdf"));
assertNotNull("Document should not be null", bean);
@@ -54,7 +54,7 @@ public class TikaContentExtractorTest {
}
@Test
public void testExtractedTextContentDoesNotMatchSearchCriteria() throws Exception {
- SearchCondition<SearchBean> sc = parser.parse("Author==Barry*");
+ SearchCondition<SearchBean> sc = parser.parse("dc:creator==Barry*");
final SearchBean bean = extractor.extractMetadataToSearchBean(
getClass().getResourceAsStream("/files/testPDF.pdf"));
assertNotNull("Document should not be null", bean);
diff --git a/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaLuceneContentExtractorTest.java b/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaLuceneContentExtractorTest.java
index f259e4c..e14d0fe 100644
--- a/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaLuceneContentExtractorTest.java
+++ b/rt/rs/extensions/search/src/test/java/org/apache/cxf/jaxrs/ext/search/tika/TikaLuceneContentExtractorTest.java
@@ -90,13 +90,14 @@ public class TikaLuceneContentExtractorTest {
assertEquals(1, getHits("ct==incubation").length);
assertEquals(0, getHits("ct==toolsuite").length);
// meta-data
- assertEquals(1, getHits("Author==Bertrand*").length);
+ assertEquals(1, getHits("dc:creator==Bertrand*").length);
}
@Test
public void testExtractedTextContentMatchesTypesAndDateSearchCriteria() throws Exception {
final LuceneDocumentMetadata documentMetadata = new LuceneDocumentMetadata("contents")
- .withField("modified", Date.class);
+ .withField("modified", Date.class)
+ .withField("dcterms:modified", Date.class);
final Document document = extractor.extract(
getClass().getResourceAsStream("/files/testPDF.pdf"), documentMetadata);
@@ -105,15 +106,16 @@ public class TikaLuceneContentExtractorTest {
writer.addDocument(document);
writer.commit();
// testPDF.pdf 'modified' is set to '2007-09-14T09:02:31Z'
- assertEquals(1, getHits("modified=gt=2007-09-14T09:02:31Z", documentMetadata.getFieldTypes()).length);
- assertEquals(1, getHits("modified=le=2007-09-15T09:02:31-0500", documentMetadata.getFieldTypes()).length);
- assertEquals(1, getHits("modified=ge=2007-09-15", documentMetadata.getFieldTypes()).length);
- assertEquals(1, getHits("modified==2007-09-15", documentMetadata.getFieldTypes()).length);
- assertEquals(0, getHits("modified==2007-09-16", documentMetadata.getFieldTypes()).length);
- assertEquals(0, getHits("modified=gt=2007-09-16", documentMetadata.getFieldTypes()).length);
- assertEquals(0, getHits("modified=lt=2007-09-15", documentMetadata.getFieldTypes()).length);
- assertEquals(0, getHits("modified=gt=2007-09-16T09:02:31", documentMetadata.getFieldTypes()).length);
- assertEquals(0, getHits("modified=lt=2007-09-01T09:02:31", documentMetadata.getFieldTypes()).length);
+ assertEquals(1, getHits("dcterms:modified=gt=2007-09-14T09:02:31Z", documentMetadata.getFieldTypes()).length);
+ assertEquals(1, getHits("dcterms:modified=le=2007-09-15T09:02:31-0500",
+ documentMetadata.getFieldTypes()).length);
+ assertEquals(1, getHits("dcterms:modified=ge=2007-09-15", documentMetadata.getFieldTypes()).length);
+ assertEquals(1, getHits("dcterms:modified==2007-09-15", documentMetadata.getFieldTypes()).length);
+ assertEquals(0, getHits("dcterms:modified==2007-09-16", documentMetadata.getFieldTypes()).length);
+ assertEquals(0, getHits("dcterms:modified=gt=2007-09-16", documentMetadata.getFieldTypes()).length);
+ assertEquals(0, getHits("dcterms:modified=lt=2007-09-15", documentMetadata.getFieldTypes()).length);
+ assertEquals(0, getHits("dcterms:modified=gt=2007-09-16T09:02:31", documentMetadata.getFieldTypes()).length);
+ assertEquals(0, getHits("dcterms:modified=lt=2007-09-01T09:02:31", documentMetadata.getFieldTypes()).length);
}
@Test
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index 45ded74..fd8fedc 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -462,6 +462,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <scope>test</scope>
+ <version>${cxf.httpcomponents.client.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<scope>test</scope>
@@ -480,30 +486,14 @@
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
- <artifactId>tika-parsers</artifactId>
+ <artifactId>tika-core</artifactId>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.poi</groupId>
- <artifactId>poi-ooxml</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.xml.bind</groupId>
- <artifactId>jaxb-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.xml.bind</groupId>
- <artifactId>jaxb-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.annotation</groupId>
- <artifactId>javax.annotation-api</artifactId>
- </exclusion>
- </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parser-pdf-module</artifactId>
+ <scope>test</scope>
+ <version>${cxf.tika.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/BookCatalog.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/BookCatalog.java
index 14124d9..26dc92f 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/BookCatalog.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/BookCatalog.java
@@ -74,8 +74,9 @@ public class BookCatalog {
if (handler != null) {
final String source = handler.getName();
final LuceneDocumentMetadata metadata = new LuceneDocumentMetadata()
- .withSource(source)
- .withField("modified", Date.class);
+ .withSource(source)
+ .withField("modified", Date.class)
+ .withField("dcterms:modified", Date.class);
final Document document = extractor.extract(handler.getInputStream(), metadata);
if (document != null) {
@@ -117,6 +118,7 @@ public class BookCatalog {
private static LuceneQueryVisitor< SearchBean > createVisitor() {
final Map< String, Class< ? > > fieldTypes = new HashMap<>();
fieldTypes.put("modified", Date.class);
+ fieldTypes.put("dcterms:modified", Date.class);
LuceneQueryVisitor<SearchBean> visitor = new LuceneQueryVisitor<>("ct", "contents");
visitor.setPrimitiveFieldTypeMap(fieldTypes);
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/JAXRSClientServerTikaTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/JAXRSClientServerTikaTest.java
index 7f0475f..905ded8 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/JAXRSClientServerTikaTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/extraction/JAXRSClientServerTikaTest.java
@@ -104,7 +104,7 @@ public class JAXRSClientServerTikaTest extends AbstractClientServerTestBase {
getClass().getResourceAsStream("/files/testPDF.pdf"), disposition);
wc.post(new MultipartBody(attachment));
- final Collection<ScoreDoc> hits = search("modified=le=2007-09-16T09:00:00");
+ final Collection<ScoreDoc> hits = search("dcterms:modified=le=2007-09-16T09:00:00");
assertEquals(hits.size(), 1);
}
@@ -118,7 +118,7 @@ public class JAXRSClientServerTikaTest extends AbstractClientServerTestBase {
wc.post(new MultipartBody(attachment));
// Use user-defined date pattern
- final Collection<ScoreDoc> hits = search("modified=le=2007/09/16");
+ final Collection<ScoreDoc> hits = search("dcterms:modified=le=2007/09/16");
assertEquals(hits.size(), 1);
}