You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/04/28 15:45:29 UTC
[tika] branch main updated: TIKA-3370 -- refactor AsyncProcessor
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 05709ee TIKA-3370 -- refactor AsyncProcessor
05709ee is described below
commit 05709eef16d1b290467aa0c2b96ed4f9cd8d2489
Author: tallison <ta...@apache.org>
AuthorDate: Wed Apr 28 11:45:00 2021 -0400
TIKA-3370 -- refactor AsyncProcessor
---
tika-core/pom.xml | 6 +
.../java/org/apache/tika/pipes/FetchEmitTuple.java | 3 +-
.../java/org/apache/tika/pipes/HandlerConfig.java | 8 +-
.../org/apache/tika/pipes/async/AsyncClient.java | 142 +++++
.../apache/tika/pipes/async/AsyncClientConfig.java | 23 +-
.../org/apache/tika/pipes/async/AsyncConfig.java | 21 +-
.../org/apache/tika/pipes/async}/AsyncEmitter.java | 18 +-
.../tika/pipes/async/AsyncEmitterConfig.java | 9 +-
.../apache/tika/pipes/async/AsyncProcessor.java | 199 +++++++
.../tika/pipes/async/AsyncRuntimeException.java | 0
.../org/apache/tika/pipes/async/AsyncServer.java | 337 ++++++++++++
.../tika/pipes/async/OfferLargerThanQueueSize.java | 26 +-
.../org/apache/tika/pipes/emitter/EmitData.java | 7 +-
.../org/apache/tika/pipes/emitter/EmitKey.java | 9 +-
.../org/apache/tika/pipes/fetcher/FetchKey.java | 9 +-
.../tika/pipes/async/AsyncProcessorTest.java | 85 +--
.../org/apache/tika/pipes/async/MockEmitter.java | 62 +++
.../org/apache/tika/pipes/async/MockFetcher.java | 0
tika-pipes/pom.xml | 1 -
tika-pipes/tika-pipes-async/pom.xml | 101 ----
.../java/org/apache/tika/pipes/async/AsyncCli.java | 358 -------------
.../org/apache/tika/pipes/async/AsyncConfig.java | 76 ---
.../org/apache/tika/pipes/async/AsyncData.java | 57 --
.../org/apache/tika/pipes/async/AsyncEmitter.java | 125 -----
.../tika/pipes/async/AsyncEmitterProcess.java | 379 -------------
.../tika/pipes/async/AsyncPipesEmitHook.java | 61 ---
.../apache/tika/pipes/async/AsyncProcessor.java | 594 ---------------------
.../org/apache/tika/pipes/async/AsyncTask.java | 54 --
.../org/apache/tika/pipes/async/AsyncWorker.java | 195 -------
.../tika/pipes/async/AsyncWorkerProcess.java | 505 ------------------
.../src/main/resources/log4j.properties | 22 -
.../org/apache/tika/pipes/async/MockEmitter.java | 103 ----
.../apache/tika/pipes/async/SerializationTest.java | 49 --
tika-server/tika-server-core/pom.xml | 1 -
.../org/apache/tika/server/core/TikaServerCli.java | 3 +-
.../apache/tika/server/core/TikaServerConfig.java | 13 +-
.../apache/tika/server/core/TikaServerProcess.java | 63 +--
.../tika/server/core/resource/AsyncParser.java | 139 -----
.../tika/server/core/resource/AsyncResource.java | 51 +-
.../core/TikaServerAsyncIntegrationTest.java | 21 +-
.../tika/server/core/TikaServerConfigTest.java | 7 +
41 files changed, 971 insertions(+), 2971 deletions(-)
diff --git a/tika-core/pom.xml b/tika-core/pom.xml
index 93a8dc2..ff98d41 100644
--- a/tika-core/pom.xml
+++ b/tika-core/pom.xml
@@ -89,6 +89,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
index 35621e9..ac5383c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java
@@ -16,13 +16,14 @@
*/
package org.apache.tika.pipes;
+import java.io.Serializable;
import java.util.Objects;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
-public class FetchEmitTuple {
+public class FetchEmitTuple implements Serializable {
public static final ON_PARSE_EXCEPTION DEFAULT_ON_PARSE_EXCEPTION = ON_PARSE_EXCEPTION.EMIT;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
index 93e7a98..5272642 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
@@ -16,11 +16,17 @@
*/
package org.apache.tika.pipes;
+import java.io.Serializable;
import java.util.Objects;
import org.apache.tika.sax.BasicContentHandlerFactory;
-public class HandlerConfig {
+public class HandlerConfig implements Serializable {
+
+ /**
+ * Serial version UID
+ */
+ private static final long serialVersionUID = -3861669115439125268L;
public static HandlerConfig DEFAULT_HANDLER_CONFIG =
new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1, -1);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
new file mode 100644
index 0000000..83baa94
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClient.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.utils.ProcessUtils;
+
+public class AsyncClient implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
+
+ //TODO: make these configurable
+ private long parseTimeoutMillis = 30000;
+ private long waitTimeoutMillis = 500000;
+
+ private Process process;
+ private final Path tikaConfigPath;
+ private DataOutputStream output;
+ private DataInputStream input;
+
+ public AsyncClient(Path tikaConfigPath) {
+ this.tikaConfigPath = tikaConfigPath;
+ }
+
+ private int filesProcessed = 0;
+
+ public int getFilesProcessed() {
+ return filesProcessed;
+ }
+
+ private boolean ping() {
+ if (process == null || ! process.isAlive()) {
+ return false;
+ }
+ try {
+ output.write(AsyncServer.PING);
+ output.flush();
+ int ping = input.read();
+ if (ping == AsyncServer.PING) {
+ return true;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() {
+ process.destroyForcibly();
+ }
+
+ public EmitData process(FetchEmitTuple t) throws IOException {
+ if (! ping()) {
+ restart();
+ }
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(t);
+ }
+ byte[] bytes = bos.toByteArray();
+ output.write(AsyncServer.CALL);
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ output.flush();
+
+ int status = input.read();
+
+ //TODO clean this up, never return null
+ if (status == AsyncServer.OOM) {
+ LOG.warn(t.getFetchKey().getFetchKey() + " oom");
+ return null;
+ } else if (status == AsyncServer.READY) {
+ } else {
+
+ throw new IOException("problem reading response from server " + status);
+ }
+ int length = input.readInt();
+ bytes = new byte[length];
+ input.readFully(bytes);
+ try (ObjectInputStream objectInputStream =
+ new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+ return (EmitData)objectInputStream.readObject();
+ } catch (ClassNotFoundException e) {
+ //this should be catastrophic
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void restart() throws IOException {
+ if (process != null) {
+ process.destroyForcibly();
+ }
+ LOG.debug("restarting process");
+ ProcessBuilder pb = new ProcessBuilder(getCommandline());
+ pb.redirectError(ProcessBuilder.Redirect.INHERIT);
+ process = pb.start();
+ input = new DataInputStream(process.getInputStream());
+ output = new DataOutputStream(process.getOutputStream());
+ }
+
+ private String[] getCommandline() {
+ //TODO: make this all configurable
+ return new String[]{
+ "java",
+ "-cp",
+ System.getProperty("java.class.path"),
+ "org.apache.tika.pipes.async.AsyncServer",
+ ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()),
+ Long.toString(parseTimeoutMillis),
+ Long.toString(waitTimeoutMillis),
+ };
+ }
+}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
similarity index 64%
copy from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
copy to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
index 875cc90..c8b8ccb 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
@@ -16,12 +16,23 @@
*/
package org.apache.tika.pipes.async;
-/**
- * Fatal exception that means that something went seriously wrong.
- */
-public class AsyncRuntimeException extends RuntimeException {
+import java.io.IOException;
+import java.nio.file.Path;
+
+class AsyncClientConfig {
+
+ private int fetchQueueSize = 20000;
+ private int numWorkers = 10;
+ private String[] workerJVMArgs;
+ private long parseTimeoutMs;
+ private long waitTimeoutMs;
+ private long maxFilesProcessed;
- public AsyncRuntimeException(Throwable t) {
- super(t);
+ public static AsyncClientConfig load(Path p) throws IOException {
+ AsyncClientConfig asyncConfig = new AsyncClientConfig();
+
+ return asyncConfig;
}
+
+
}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
similarity index 69%
copy from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
copy to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index 875cc90..d16dd16 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -16,12 +16,21 @@
*/
package org.apache.tika.pipes.async;
-/**
- * Fatal exception that means that something went seriously wrong.
- */
-public class AsyncRuntimeException extends RuntimeException {
+import java.io.IOException;
+import java.nio.file.Path;
+
+class AsyncConfig {
+
+
+ private AsyncClientConfig asyncClientConfig;
+ private AsyncClientConfig asyncRetryClientConfig;
+ private AsyncEmitterConfig asyncEmitterConfig;
- public AsyncRuntimeException(Throwable t) {
- super(t);
+ public static AsyncConfig load(Path p) throws IOException {
+ AsyncConfig asyncConfig = new AsyncConfig();
+
+ return asyncConfig;
}
+
+
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
similarity index 88%
rename from tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
rename to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index 054a1d2..fa53764 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.server.core.resource;
+package org.apache.tika.pipes.async;
import java.io.IOException;
import java.time.Instant;
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.utils.ExceptionUtils;
@@ -42,6 +43,9 @@ import org.apache.tika.utils.ExceptionUtils;
*/
public class AsyncEmitter implements Callable<Integer> {
+ static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null);
+ static final int EMITTER_FUTURE_CODE = 2;
+
private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
//TODO -- need to configure these
@@ -49,12 +53,14 @@ public class AsyncEmitter implements Callable<Integer> {
private long maxEstimatedBytes = 10_000_000;
+ private final EmitterManager emitterManager;
private final ArrayBlockingQueue<EmitData> emitDataQueue;
Instant lastEmitted = Instant.now();
- public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData) {
+ public AsyncEmitter(ArrayBlockingQueue<EmitData> emitData, EmitterManager emitterManager) {
this.emitDataQueue = emitData;
+ this.emitterManager = emitterManager;
}
@Override
@@ -63,6 +69,10 @@ public class AsyncEmitter implements Callable<Integer> {
while (true) {
EmitData emitData = emitDataQueue.poll(100, TimeUnit.MILLISECONDS);
+ if (emitData == EMIT_DATA_STOP_SEMAPHORE) {
+ cache.emitAll();
+ return EMITTER_FUTURE_CODE;
+ }
if (emitData != null) {
//this can block on emitAll
cache.add(emitData);
@@ -116,11 +126,11 @@ public class AsyncEmitter implements Callable<Integer> {
int emitted = 0;
LOG.debug("about to emit {}", size);
for (Map.Entry<String, List<EmitData>> e : map.entrySet()) {
- Emitter emitter = TikaResource.getConfig()
- .getEmitterManager().getEmitter(e.getKey());
+ Emitter emitter = emitterManager.getEmitter(e.getKey());
tryToEmit(emitter, e.getValue());
emitted += e.getValue().size();
}
+
LOG.debug("emitted: {}", emitted);
estimatedSize = 0;
size = 0;
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
similarity index 85%
rename from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
rename to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
index 502c8b4..ed6973c 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitHook.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitterConfig.java
@@ -16,9 +16,12 @@
*/
package org.apache.tika.pipes.async;
-public interface AsyncEmitHook {
+class AsyncEmitterConfig {
+
+ private long emitWithinMs;
+ private long emitMaxEstimatedBytes;
+ private final int numEmitters = 1;
+
- void onSuccess(AsyncTask task);
- void onFail(AsyncTask task);
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
new file mode 100644
index 0000000..0f81bf7
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitterManager;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+
+/**
+ * This is the main class for handling async requests. This manages
+ * AsyncClients and AsyncEmitters.
+ *
+ * The AsyncClient and AsyncServer communicate over AsyncServer's
+ * STDERR because the default log4j 2 appender writes to STDOUT. If you configure logging for the
+ * AsyncServer, DO NOT write to STDERR.
+ */
+public class AsyncProcessor implements Closeable {
+
+ static final int PARSER_FUTURE_CODE = 1;
+ private final Path tikaConfigPath;
+ private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
+ private final ArrayBlockingQueue<EmitData> emitData;
+ private final ExecutorCompletionService<Integer> executorCompletionService;
+ private final ExecutorService executorService;
+ private final int fetchEmitTupleQSize = 1000;
+ private int numParserThreads = 10;
+ private int numEmitterThreads = 2;
+ private int numParserThreadsFinished = 0;
+ private boolean addedEmitterSemaphores = false;
+ private int finished = 0;
+ boolean isShuttingDown = false;
+
+ public AsyncProcessor(Path tikaConfigPath) throws TikaException, IOException, SAXException {
+ this.tikaConfigPath = tikaConfigPath;
+ this.fetchEmitTuples = new ArrayBlockingQueue<>(fetchEmitTupleQSize);
+ this.emitData = new ArrayBlockingQueue<>(100);
+ this.executorService = Executors.newFixedThreadPool(numParserThreads + numEmitterThreads);
+ this.executorCompletionService =
+ new ExecutorCompletionService<>(executorService);
+ for (int i = 0; i < numParserThreads; i++) {
+ executorCompletionService.submit(new FetchEmitWorker(tikaConfigPath, fetchEmitTuples,
+ emitData));
+ }
+
+ EmitterManager emitterManager = new TikaConfig(tikaConfigPath).getEmitterManager();
+ for (int i = 0; i < numEmitterThreads; i++) {
+ executorCompletionService.submit(new AsyncEmitter(emitData, emitterManager));
+ }
+ }
+
+ public synchronized boolean offer(List<FetchEmitTuple> newFetchEmitTuples, long offerMs)
+ throws AsyncRuntimeException, InterruptedException {
+ if (isShuttingDown) {
+ throw new IllegalStateException(
+ "Can't call offer after calling close() or " + "shutdownNow()");
+ }
+ if (newFetchEmitTuples.size() > fetchEmitTupleQSize) {
+ throw new OfferLargerThanQueueSize(newFetchEmitTuples.size(),
+ fetchEmitTupleQSize);
+ }
+ long start = System.currentTimeMillis();
+ long elapsed = System.currentTimeMillis() - start;
+ while (elapsed < offerMs) {
+ if (fetchEmitTuples.remainingCapacity() > newFetchEmitTuples.size()) {
+ try {
+ fetchEmitTuples.addAll(newFetchEmitTuples);
+ return true;
+ } catch (IllegalStateException e) {
+ e.printStackTrace();
+ //this means that the add all failed because the queue couldn't
+ //take the full list
+ }
+ }
+ Thread.sleep(100);
+ elapsed = System.currentTimeMillis() - start;
+ }
+ return false;
+ }
+
+ public int getCapacity() {
+ return fetchEmitTuples.remainingCapacity();
+ }
+
+ public synchronized boolean offer(FetchEmitTuple t, long offerMs)
+ throws AsyncRuntimeException, InterruptedException {
+ if (fetchEmitTuples == null) {
+ throw new IllegalStateException("queue hasn't been initialized yet.");
+ } else if (isShuttingDown) {
+ throw new IllegalStateException(
+ "Can't call offer after calling close() or " + "shutdownNow()");
+ }
+ checkActive();
+ return fetchEmitTuples.offer(t, offerMs, TimeUnit.MILLISECONDS);
+ }
+
+ public boolean checkActive() {
+
+ Future<Integer> future = executorCompletionService.poll();
+ if (future != null) {
+ try {
+ Integer i = future.get();
+ if (i == PARSER_FUTURE_CODE) {
+ numParserThreadsFinished++;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ finished++;
+ }
+ if (numParserThreadsFinished == numParserThreads && ! addedEmitterSemaphores) {
+ for (int i = 0; i < numEmitterThreads; i++) {
+ emitData.offer(AsyncEmitter.EMIT_DATA_STOP_SEMAPHORE);
+ }
+ addedEmitterSemaphores = true;
+ }
+ return finished != (numEmitterThreads + numParserThreads);
+ }
+
+ @Override
+ public void close() throws IOException {
+ executorService.shutdownNow();
+ }
+
+ private class FetchEmitWorker implements Callable<Integer> {
+
+ private final Path tikaConfigPath;
+ private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
+ private final ArrayBlockingQueue<EmitData> emitDataQueue;
+
+ private FetchEmitWorker(Path tikaConfigPath,
+ ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples,
+ ArrayBlockingQueue<EmitData> emitDataQueue) {
+ this.tikaConfigPath = tikaConfigPath;
+ this.fetchEmitTuples = fetchEmitTuples;
+ this.emitDataQueue = emitDataQueue;
+ }
+ @Override
+ public Integer call() throws Exception {
+
+ try (AsyncClient asyncClient = new AsyncClient(tikaConfigPath)) {
+ while (true) {
+ FetchEmitTuple t = fetchEmitTuples.poll(1, TimeUnit.SECONDS);
+ if (t == null) {
+ //skip
+ } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ return PARSER_FUTURE_CODE;
+ } else {
+ EmitData emitData = null;
+
+ try {
+ emitData = asyncClient.process(t);
+ } catch (IOException e) {
+ e.printStackTrace();
+ continue;
+ }
+ if (emitData != null) {
+ //TODO -- add timeout, this currently hangs forever
+ emitDataQueue.offer(emitData);
+ }
+ }
+ checkActive();
+ }
+ }
+ }
+ }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
similarity index 100%
copy from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
copy to tika-core/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
new file mode 100644
index 0000000..cc1e14b
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncServer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.xml.sax.SAXException;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.EncryptedDocumentException;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.parser.RecursiveParserWrapper;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.Fetcher;
+import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.RecursiveParserWrapperHandler;
+import org.apache.tika.utils.StringUtils;
+
+public class AsyncServer implements Runnable {
+
+ public static final byte ERROR = -1;
+
+ public static final byte DONE = 0;
+
+ public static final byte CALL = 1;
+
+ public static final byte PING = 2;
+
+ public static final byte RESOURCE = 3;
+
+ public static final byte READY = 4;
+
+ public static final byte FAILED_TO_START = 5;
+
+ public static final byte OOM = 6;
+
+ public static final byte TIMEOUT = 7;
+
+ private final Object[] lock = new Object[0];
+ private final Path tikaConfigPath;
+ private final DataInputStream input;
+ private final DataOutputStream output;
+ private final long serverParseTimeoutMillis;
+ private final long serverWaitTimeoutMillis;
+ private Parser parser;
+ private TikaConfig tikaConfig;
+ private volatile boolean parsing;
+ private volatile long since;
+
+ //logging is fussy...the logging frameworks grab stderr/stdout
+ //before we can redirect. slf4j complains on stderr, log4j2 unconfigured writes to stdout
+ //We can add logging later but it has to be done carefully...
+ public AsyncServer(Path tikaConfigPath, InputStream in, PrintStream out,
+ long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
+ throws IOException, TikaException, SAXException {
+ this.tikaConfigPath = tikaConfigPath;
+ this.input = new DataInputStream(in);
+ this.output = new DataOutputStream(out);
+ this.serverParseTimeoutMillis = serverParseTimeoutMillis;
+ this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
+ this.parsing = false;
+ this.since = System.currentTimeMillis();
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ Path tikaConfig = Paths.get(args[0]);
+ long serverParseTimeoutMillis = Long.parseLong(args[1]);
+ long serverWaitTimeoutMillis = Long.parseLong(args[2]);
+
+ AsyncServer server =
+ new AsyncServer(tikaConfig, System.in, System.out, serverParseTimeoutMillis,
+ serverWaitTimeoutMillis);
+ System.setIn(new ByteArrayInputStream(new byte[0]));
+ System.setOut(System.err);
+
+ Thread watchdog = new Thread(server, "Tika Watchdog");
+ watchdog.setDaemon(true);
+ watchdog.start();
+
+ server.processRequests();
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ synchronized (lock) {
+ long elapsed = System.currentTimeMillis() - since;
+ if (parsing && elapsed > serverParseTimeoutMillis) {
+ //LOG.error("timeout");
+ System.exit(1);
+ } else if (!parsing && serverWaitTimeoutMillis > 0 &&
+ elapsed > serverWaitTimeoutMillis) {
+ //LOG.info("closing down from inactivity");
+ System.exit(0);
+ }
+ }
+ Thread.sleep(250);
+ }
+ } catch (InterruptedException e) {
+ //swallow
+ }
+ }
+
+ public void processRequests() {
+ //initialize
+ try {
+ initializeParser();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ System.err.flush();
+ try {
+ output.writeByte(FAILED_TO_START);
+ output.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.err.flush();
+ }
+ return;
+ }
+ //main loop
+ try {
+ while (true) {
+ int request = input.read();
+ if (request == -1) {
+ break;
+ } else if (request == PING) {
+ output.writeByte(PING);
+ output.flush();
+ } else if (request == CALL) {
+ EmitData emitData = parseOne();
+ write(emitData);
+ } else {
+ throw new IllegalStateException("Unexpected request");
+ }
+ output.flush();
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ System.err.flush();
+ }
+
+ private void write(EmitData emitData) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
+ objectOutputStream.writeObject(emitData);
+ }
+ byte[] bytes = bos.toByteArray();
+ int len = bytes.length;
+ output.write(READY);
+ output.writeInt(len);
+ output.write(bytes);
+ output.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ //LOG.error("problem writing emit data", e);
+ exit(1);
+ }
+ }
+
+ private EmitData parseOne() throws FetchException {
+ synchronized (lock) {
+ parsing = true;
+ since = System.currentTimeMillis();
+ }
+ try {
+ FetchEmitTuple t = readFetchEmitTuple();
+ Metadata userMetadata = t.getMetadata();
+ Metadata metadata = new Metadata();
+ String fetcherName = t.getFetchKey().getFetcherName();
+ String fetchKey = t.getFetchKey().getFetchKey();
+ List<Metadata> metadataList = null;
+ Fetcher fetcher = null;
+ try {
+ fetcher = tikaConfig.getFetcherManager().getFetcher(fetcherName);
+ } catch (TikaException | IOException e) {
+ //LOG.error("can't get fetcher", e);
+ throw new FetchException(e);
+ }
+
+ try (InputStream stream = fetcher.fetch(fetchKey, metadata)) {
+ metadataList = parseMetadata(t, stream, metadata);
+ } catch (SecurityException e) {
+ throw e;
+ } catch (TikaException | IOException e) {
+ //LOG.error("problem reading from fetcher", e);
+ throw new FetchException(e);
+ } catch (OutOfMemoryError e) {
+ //LOG.error("oom", e);
+ handleOOM(e);
+ }
+
+ injectUserMetadata(userMetadata, metadataList);
+ EmitKey emitKey = t.getEmitKey();
+ if (StringUtils.isBlank(emitKey.getEmitKey())) {
+ emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
+ t.setEmitKey(emitKey);
+ }
+ return new EmitData(t.getEmitKey(), metadataList);
+ } finally {
+ synchronized (lock) {
+ parsing = false;
+ since = System.currentTimeMillis();
+ }
+ }
+ }
+
+ private void handleOOM(OutOfMemoryError oom) {
+ try {
+ output.writeByte(OOM);
+ output.flush();
+ } catch (IOException e) {
+ //swallow at this point
+ }
+ exit(1);
+ }
+
+ private List<Metadata> parseMetadata(FetchEmitTuple fetchEmitTuple, InputStream stream,
+ Metadata metadata) {
+ //make these configurable
+ BasicContentHandlerFactory.HANDLER_TYPE type = BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+
+
+ RecursiveParserWrapperHandler handler =
+ new RecursiveParserWrapperHandler(new BasicContentHandlerFactory(type,
+ fetchEmitTuple.getHandlerConfig().getWriteLimit()),
+ fetchEmitTuple.getHandlerConfig().getMaxEmbeddedResources(),
+ tikaConfig.getMetadataFilter());
+ ParseContext parseContext = new ParseContext();
+ FetchKey fetchKey = fetchEmitTuple.getFetchKey();
+ try {
+ parser.parse(stream, handler, metadata, parseContext);
+ } catch (SAXException e) {
+ //LOG.warn("problem:" + fetchKey.getFetchKey(), e);
+ } catch (EncryptedDocumentException e) {
+ //LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
+ } catch (SecurityException e) {
+ //LOG.warn("security exception: " + fetchKey.getFetchKey());
+ throw e;
+ } catch (Exception e) {
+ //LOG.warn("exception: " + fetchKey.getFetchKey());
+ } catch (OutOfMemoryError e) {
+ //TODO, maybe return file type gathered so far and then crash?
+ //LOG.error("oom: " + fetchKey.getFetchKey());
+ throw e;
+ }
+ return handler.getMetadataList();
+ }
+
+ private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
+ for (String n : userMetadata.names()) {
+ //overwrite whatever was there
+ metadataList.get(0).set(n, null);
+ for (String val : userMetadata.getValues(n)) {
+ metadataList.get(0).add(n, val);
+ }
+ }
+ }
+
+
+ private void exit(int exitCode) {
+ System.exit(exitCode);
+ }
+
+
+ private FetchEmitTuple readFetchEmitTuple() {
+ try {
+ int length = input.readInt();
+ byte[] bytes = new byte[length];
+ input.readFully(bytes);
+ try (ObjectInputStream objectInputStream =
+ new ObjectInputStream(new ByteArrayInputStream(bytes))) {
+ return (FetchEmitTuple) objectInputStream.readObject();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ System.err.flush();
+ //LOG.error("problem reading tuple", e);
+ System.exit(1);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ System.err.flush();
+ //LOG.error("can't find class?!", e);
+ System.exit(1);
+ }
+ //unreachable, no?!
+ return null;
+ }
+
+ private void initializeParser() throws TikaException, IOException, SAXException {
+ //TODO allowed named configurations in tika config
+ this.tikaConfig = new TikaConfig(tikaConfigPath);
+ Parser autoDetectParser = new AutoDetectParser(this.tikaConfig);
+ this.parser = new RecursiveParserWrapper(autoDetectParser);
+
+ }
+
+ private static class FetchException extends IOException {
+ FetchException(Throwable t) {
+ super(t);
+ }
+ }
+}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java b/tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
similarity index 58%
rename from tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
rename to tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
index 875cc90..da96c80 100644
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncRuntimeException.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/OfferLargerThanQueueSize.java
@@ -16,12 +16,26 @@
*/
package org.apache.tika.pipes.async;
-/**
- * Fatal exception that means that something went seriously wrong.
- */
-public class AsyncRuntimeException extends RuntimeException {
+public class OfferLargerThanQueueSize extends IllegalArgumentException {
+ private final int sizeOffered;
+ private final int queueSize;
+
+ public OfferLargerThanQueueSize(int sizeOffered, int queueSize) {
+ this.sizeOffered = sizeOffered;
+ this.queueSize = queueSize;
+ }
+
+ @Override
+ public String getMessage() {
+ return "sizeOffered (" + sizeOffered + ") is greater than queue size (" +
+ queueSize + ")";
+ }
+
+ public int getQueueSize() {
+ return queueSize;
+ }
- public AsyncRuntimeException(Throwable t) {
- super(t);
+ public int getSizeOffered() {
+ return sizeOffered;
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
index b3fec7f..797de58 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -16,11 +16,16 @@
*/
package org.apache.tika.pipes.emitter;
+import java.io.Serializable;
import java.util.List;
import org.apache.tika.metadata.Metadata;
-public class EmitData {
+public class EmitData implements Serializable {
+ /**
+ * Serial version UID
+ */
+ private static final long serialVersionUID = -3861669115439125268L;
private final EmitKey emitKey;
private final List<Metadata> metadataList;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
index b565a43..08d798d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitKey.java
@@ -16,7 +16,14 @@
*/
package org.apache.tika.pipes.emitter;
-public class EmitKey {
+import java.io.Serializable;
+
+public class EmitKey implements Serializable {
+
+ /**
+ * Serial version UID
+ */
+ private static final long serialVersionUID = -3861669115439125268L;
private String emitterName;
private String emitKey;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
index 2c0ea64..090001e 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/FetchKey.java
@@ -16,11 +16,18 @@
*/
package org.apache.tika.pipes.fetcher;
+import java.io.Serializable;
+
/**
* Pair of fetcherName (which fetcher to call) and the key
* to send to that fetcher to retrieve a specific file.
*/
-public class FetchKey {
+public class FetchKey implements Serializable {
+ /**
+ * Serial version UID
+ */
+ private static final long serialVersionUID = -3861669115439125268L;
+
private String fetcherName;
private String fetchKey;
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
similarity index 52%
rename from tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
rename to tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 2408a63..7fbfef8 100644
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -17,18 +17,14 @@
package org.apache.tika.pipes.async;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.HashSet;
+import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
@@ -38,68 +34,79 @@ import org.junit.Test;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.utils.ProcessUtils;
public class AsyncProcessorTest {
- private Path dbDir;
- private Path dbFile;
- private Connection connection;
+ private final String OOM = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+ "<throw class=\"java.lang.OutOfMemoryError\">oom message</throw>\n</mock>";
+ private final String OK = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<mock>" +
+ "<metadata action=\"add\" name=\"dc:creator\">Nikolai Lobachevsky</metadata>" +
+ "<write element=\"p\">main_content</write>" + "</mock>";
private Path tikaConfigPath;
+ private Path inputDir;
+ private final int totalFiles = 100;
+ private int ok = 0;
+ private int oom = 0;
@Before
public void setUp() throws SQLException, IOException {
- dbDir = Files.createTempDirectory("async-db");
- dbFile = dbDir.resolve("emitted-db");
- String jdbc = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
- String sql = "create table emitted (id int auto_increment primary key, " +
- "emitkey varchar(2000), json varchar(20000))";
-
- connection = DriverManager.getConnection(jdbc);
- connection.createStatement().execute(sql);
- tikaConfigPath = dbDir.resolve("tika-config.xml");
+ inputDir = Files.createTempDirectory("tika-async-");
+ tikaConfigPath = Files.createTempFile("tika-config-", ".xml");
String xml = "" + "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" +
" <emitters>" + " <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" +
" <params>\n" + " <param name=\"name\" type=\"string\">mock</param>\n" +
- " <param name=\"jdbc\" type=\"string\">" + jdbc + "</param>\n" +
" </params>" + " </emitter>" + " </emitters>" + " <fetchers>" +
- " <fetcher class=\"org.apache.tika.pipes.async.MockFetcher\">" +
- " <param name=\"name\" type=\"string\">mock</param>\n" + " </fetcher>" +
+ " <fetcher class=\"org.apache.tika.pipes.fetcher.FileSystemFetcher\">" +
+ " <params><param name=\"name\" type=\"string\">mock</param>\n" +
+ " <param name=\"basePath\" type=\"string\">" +
+ ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString())
+ + "</param></params>\n" +
+ " </fetcher>" +
" </fetchers>" + "</properties>";
Files.write(tikaConfigPath, xml.getBytes(StandardCharsets.UTF_8));
+ Random r = new Random();
+ for (int i = 0; i < totalFiles; i++) {
+ if (r.nextFloat() < 0.1) {
+ Files.write(inputDir.resolve(i + ".xml"), OOM.getBytes(StandardCharsets.UTF_8));
+ oom++;
+ } else {
+ Files.write(inputDir.resolve(i + ".xml"), OK.getBytes(StandardCharsets.UTF_8));
+ ok++;
+ }
+ }
}
@After
public void tearDown() throws SQLException, IOException {
- connection.createStatement().execute("drop table emitted");
- connection.close();
- FileUtils.deleteDirectory(dbDir.toFile());
+ Files.delete(tikaConfigPath);
+ FileUtils.deleteDirectory(inputDir.toFile());
}
@Test
public void testBasic() throws Exception {
-
-
- AsyncProcessor processor = AsyncProcessor.build(tikaConfigPath);
- int max = 100;
- for (int i = 0; i < max; i++) {
- FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", "key-" + i),
+ AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
+ for (int i = 0; i < totalFiles; i++) {
+ FetchEmitTuple t = new FetchEmitTuple(new FetchKey("mock", i + ".xml"),
new EmitKey("mock", "emit-" + i), new Metadata());
processor.offer(t, 1000);
}
+ for (int i = 0; i < 10; i++) {
+ processor.offer(FetchIterator.COMPLETED_SEMAPHORE, 1000);
+ }
+ //TODO clean this up
+ while (processor.checkActive()) {
+ Thread.sleep(100);
+ }
processor.close();
- String sql = "select emitkey from emitted";
Set<String> emitKeys = new HashSet<>();
- try (Statement st = connection.createStatement(); ResultSet rs = st.executeQuery(sql)) {
- while (rs.next()) {
- String emitKey = rs.getString(1);
- emitKeys.add(emitKey);
- }
- }
- assertEquals(max, emitKeys.size());
- for (int i = 0; i < max; i++) {
- assertTrue(emitKeys.contains("emit-" + i));
+ for (EmitData d : MockEmitter.EMIT_DATA) {
+ emitKeys.add(d.getEmitKey().getEmitKey());
}
+ assertEquals(ok, emitKeys.size());
}
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
new file mode 100644
index 0000000..1672b33
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.emitter.Emitter;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+
+public class MockEmitter implements Emitter {
+
+ public MockEmitter() {
+ }
+
+ static ArrayBlockingQueue<EmitData> EMIT_DATA = new ArrayBlockingQueue<>(10000);
+
+ @Override
+ public String getName() {
+ return "mock";
+ }
+
+ @Override
+ public void emit(String emitKey, List<Metadata> metadataList)
+ throws IOException, TikaEmitterException {
+ emit(Collections
+ .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
+ }
+
+ @Override
+ public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
+ int inserted = 0;
+ for (EmitData d : emitData) {
+ EMIT_DATA.offer(d);
+ }
+ }
+
+ public static List<EmitData> getData() {
+ return new ArrayList<>(EMIT_DATA);
+ }
+
+}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
similarity index 100%
rename from tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
rename to tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 97eed63..8614090 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,7 +36,6 @@
<module>tika-fetchers</module>
<module>tika-emitters</module>
<module>tika-fetch-iterators</module>
- <module>tika-pipes-async</module>
<module>tika-pipes-integration-tests</module>
</modules>
diff --git a/tika-pipes/tika-pipes-async/pom.xml b/tika-pipes/tika-pipes-async/pom.xml
deleted file mode 100644
index 08d1ff2..0000000
--- a/tika-pipes/tika-pipes-async/pom.xml
+++ /dev/null
@@ -1,101 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.tika</groupId>
- <artifactId>tika-pipes</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>tika-pipes-async</artifactId>
- <packaging>pom</packaging>
- <name>Apache Tika emitters</name>
- <url>https://tika.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-serialization</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>${commons.io.version}</version>
- </dependency>
- <dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
- <version>${h2.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- <version>1.3.0</version>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-emitter-fs</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-core</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <!--
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-parsers-classic-package</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>-->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
-</project>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
deleted file mode 100644
index 4321c2c..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncCli.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.EmptyFetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-
-public class AsyncCli {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncCli.class);
-
- public static void main(String[] args) throws Exception {
- Path configPath = Paths.get(args[0]);
- int maxConsumers = 20;
- AsyncCli asyncCli = new AsyncCli();
- Path dbDir = Files.createTempDirectory("tika-async-db-");
- try {
- asyncCli.execute(dbDir, configPath, maxConsumers);
- } finally {
- FileUtils.deleteDirectory(dbDir.toFile());
- }
-
- }
-
- private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
- PreparedStatement findActiveWorkers =
- connection.prepareStatement("select worker_id from workers");
- List<Integer> workers = new ArrayList<>();
- try (ResultSet rs = findActiveWorkers.executeQuery()) {
- while (rs.next()) {
- workers.add(rs.getInt(1));
- }
- }
- return workers;
- }
-
- private void execute(Path dbDir, Path configPath, int maxConsumers) throws Exception {
- TikaConfig tikaConfig = new TikaConfig(configPath);
-
- String connectionString = setupTables(dbDir);
-
- ExecutorService executorService = Executors.newFixedThreadPool(maxConsumers + 3);
- ExecutorCompletionService<Integer> executorCompletionService =
- new ExecutorCompletionService<>(executorService);
-
- try (Connection connection = DriverManager.getConnection(connectionString)) {
- FetchIterator fetchIterator = tikaConfig.getFetchIterator();
- if (fetchIterator instanceof EmptyFetchIterator) {
- throw new IllegalArgumentException("can't have empty fetch iterator");
- }
- ArrayBlockingQueue<FetchEmitTuple> q =
- new ArrayBlockingQueue<>(10000);//fetchIterator.init(maxConsumers);
- AsyncTaskEnqueuer enqueuer = new AsyncTaskEnqueuer(q, connection);
- executorCompletionService.submit(fetchIterator);
- executorCompletionService.submit(enqueuer);
- executorCompletionService.submit(new AssignmentManager(connection, enqueuer));
-
- for (int i = 0; i < maxConsumers; i++) {
- executorCompletionService
- .submit(new AsyncWorker(connection, connectionString, i, configPath));
- }
- int completed = 0;
- while (completed < maxConsumers + 3) {
- Future<Integer> future = executorCompletionService.take();
- if (future != null) {
- int val = future.get();
- completed++;
- LOG.debug("finished " + val);
- }
- }
- } finally {
- executorService.shutdownNow();
- }
- }
-
- private String setupTables(Path dbDir) throws SQLException {
- Path dbFile = dbDir.resolve("tika-async");
- String url = "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE";
- Connection connection = DriverManager.getConnection(url);
-
- String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
- "status tinyint," + //byte
- "worker_id integer," + "retry smallint," + //short
- "time_stamp timestamp," + "json varchar(64000))";
- connection.createStatement().execute(sql);
- //no clear benefit to creating an index on timestamp
-// sql = "CREATE INDEX IF NOT EXISTS status_timestamp on status (time_stamp)";
- sql = "create table workers (worker_id int primary key)";
- connection.createStatement().execute(sql);
-
- sql = "create table workers_shutdown (worker_id int primary key)";
- connection.createStatement().execute(sql);
-
- sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
- "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
- connection.createStatement().execute(sql);
-
- return url;
- }
-
- //this reads fetchemittuples from the queue and inserts them in the db
- //for the workers to read
- private static class AsyncTaskEnqueuer implements Callable<Integer> {
- private final PreparedStatement insert;
-
- private final ArrayBlockingQueue<FetchEmitTuple> queue;
- private final Connection connection;
- private final Random random = new Random();
-
- private volatile boolean isComplete = false;
-
- AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
- throws SQLException {
- this.queue = queue;
- this.connection = connection;
- String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
- "values (?,CURRENT_TIMESTAMP(),?,?,?)";
- insert = connection.prepareStatement(sql);
- }
-
- @Override
- public Integer call() throws Exception {
- List<Integer> workers = new ArrayList<>();
- while (true) {
- FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
- LOG.debug("enqueing to db " + t);
- if (t == null) {
- //log.trace?
- } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
- isComplete = true;
- return 1;
- } else {
- long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- //TODO -- fix this
- while (workers.size() == 0 && elapsed < 600000) {
- workers = getActiveWorkers(connection);
- Thread.sleep(100);
- elapsed = System.currentTimeMillis() - start;
- }
- insert(t, workers);
- }
- }
- }
-
- boolean isComplete() {
- return isComplete;
- }
-
- private void insert(FetchEmitTuple t, List<Integer> workers)
- throws IOException, SQLException {
- int workerId = workers.size() == 1 ? workers.get(0) :
- workers.get(random.nextInt(workers.size()));
- insert.clearParameters();
- insert.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
- insert.setInt(2, workerId);
- insert.setShort(3, (short) 0);
- insert.setString(4, JsonFetchEmitTuple.toJson(t));
- insert.execute();
- }
- }
-
- private static class AssignmentManager implements Callable {
-
- private final Connection connection;
- private final AsyncTaskEnqueuer enqueuer;
- private final PreparedStatement getQueueDistribution;
- private final PreparedStatement findMissingWorkers;
- private final PreparedStatement allocateNonworkersToWorkers;
- private final PreparedStatement reallocate;
- private final PreparedStatement countAvailableTasks;
- private final PreparedStatement insertWorkersShutdown;
- private final Random random = new Random();
-
-
- public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer)
- throws SQLException {
- this.connection = connection;
- this.enqueuer = enqueuer;
- //this gets workers and # of tasks in desc order of number of tasks
- String sql = "select w.worker_id, p.cnt " + "from workers w " +
- "left join (select worker_id, count(1) as cnt from task_queue " +
- "where status=0 group by worker_id)" +
- " p on p.worker_id=w.worker_id order by p.cnt desc";
- getQueueDistribution = connection.prepareStatement(sql);
- //find workers that have assigned tasks but are not in the
- //workers table
- sql = "select p.worker_id, count(1) as cnt from task_queue p " +
- "left join workers w on p.worker_id=w.worker_id " +
- "where w.worker_id is null group by p.worker_id";
- findMissingWorkers = connection.prepareStatement(sql);
-
- sql = "update task_queue set worker_id=? where worker_id=?";
- allocateNonworkersToWorkers = connection.prepareStatement(sql);
-
- //current strategy reallocate tasks from longest queue to shortest
- //TODO: might consider randomly shuffling or other algorithms
- sql = "update task_queue set worker_id= ? where id in " +
- "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
- "and status=0 for update)";
- reallocate = connection.prepareStatement(sql);
-
- sql = "select count(1) from task_queue where status=" +
- AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal();
- countAvailableTasks = connection.prepareStatement(sql);
-
- sql = "insert into workers_shutdown (worker_id) values (?)";
- insertWorkersShutdown = connection.prepareStatement(sql);
- }
-
- @Override
- public Integer call() throws Exception {
-
- while (true) {
- List<Integer> missingWorkers = getMissingWorkers();
- reallocateFromMissingWorkers(missingWorkers);
- redistribute();
- if (isComplete()) {
- notifyWorkers();
- return 1;
- }
- Thread.sleep(100);
- }
- }
-
- private void notifyWorkers() throws SQLException {
- for (int workerId : getActiveWorkers(connection)) {
- insertWorkersShutdown.clearParameters();
- insertWorkersShutdown.setInt(1, workerId);
- insertWorkersShutdown.execute();
- }
- }
-
- private boolean isComplete() throws SQLException {
- if (!enqueuer.isComplete) {
- return false;
- }
- try (ResultSet rs = countAvailableTasks.executeQuery()) {
- while (rs.next()) {
- return rs.getInt(1) == 0;
- }
- }
- return false;
- }
-
- private void redistribute() throws SQLException {
- //parallel lists of workerid = task queue size
- List<Integer> workerIds = new ArrayList<>();
- List<Integer> queueSize = new ArrayList<>();
- int totalTasks = 0;
-
- try (ResultSet rs = getQueueDistribution.executeQuery()) {
- while (rs.next()) {
- int workerId = rs.getInt(1);
- int numTasks = rs.getInt(2);
- workerIds.add(workerId);
- queueSize.add(numTasks);
- LOG.debug("workerId: ({}) numTasks: ({})", workerId, numTasks);
- totalTasks += numTasks;
- }
- }
- if (workerIds.size() == 0) {
- return;
- }
- int averagePerWorker = Math.round((float) totalTasks / (float) workerIds.size());
- int midPoint = Math.round((float) queueSize.size() / 2) + 1;
- for (int i = queueSize.size() - 1, j = 0; i > midPoint && j < midPoint; i--, j++) {
- int shortestQueue = queueSize.get(i);
- int longestQueue = queueSize.get(j);
- if ((shortestQueue < 5 && longestQueue > 5) ||
- longestQueue > 5 && longestQueue > (int) (1.5 * averagePerWorker)) {
- int shortestQueueWorker = workerIds.get(i);
- int longestQueueWorker = workerIds.get(j);
- reallocate.clearParameters();
- reallocate.setLong(1, shortestQueueWorker);
- reallocate.setLong(2, longestQueueWorker);
- reallocate.execute();
- }
- }
-
- }
-
- private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
- throws SQLException {
-
- if (missingWorkers.size() == 0) {
- return;
- }
-
- List<Integer> activeWorkers = getActiveWorkers(connection);
- if (activeWorkers.size() == 0) {
- return;
- }
-
- for (int missing : missingWorkers) {
- int active = activeWorkers.get(random.nextInt(activeWorkers.size()));
- allocateNonworkersToWorkers.clearParameters();
- allocateNonworkersToWorkers.setInt(1, active);
- allocateNonworkersToWorkers.setInt(2, missing);
- allocateNonworkersToWorkers.execute();
- LOG.debug("allocating missing working ({}) to ({})", missing, active);
- }
- }
-
- private List<Integer> getMissingWorkers() throws SQLException {
- List<Integer> missingWorkers = new ArrayList<>();
- try (ResultSet rs = findMissingWorkers.executeQuery()) {
- while (rs.next()) {
- int workerId = rs.getInt(1);
- missingWorkers.add(workerId);
- LOG.debug("Worker ({}) no longer active", workerId);
- }
- }
- return missingWorkers;
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
deleted file mode 100644
index 33964b3..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-
-import org.apache.tika.utils.StringUtils;
-
-public class AsyncConfig {
-
- private final int queueSize = 1000;
- private final int numWorkers = 10;
- private final int numEmitters = 1;
- private String jdbcString;
- private Path dbDir;
-
- public static AsyncConfig load(Path p) throws IOException {
- AsyncConfig asyncConfig = new AsyncConfig();
-
- if (StringUtils.isBlank(asyncConfig.getJdbcString())) {
- asyncConfig.dbDir = Files.createTempDirectory("tika-async-");
- Path dbFile = asyncConfig.dbDir.resolve("tika-async");
- asyncConfig.setJdbcString(
- "jdbc:h2:file:" + dbFile.toAbsolutePath().toString() + ";AUTO_SERVER=TRUE");
- } else {
- asyncConfig.dbDir = null;
- }
- return asyncConfig;
- }
-
- public int getQueueSize() {
- return queueSize;
- }
-
- public int getNumWorkers() {
- return numWorkers;
- }
-
- public int getNumEmitters() {
- return numEmitters;
- }
-
- public String getJdbcString() {
- return jdbcString;
- }
-
- public void setJdbcString(String jdbcString) {
- this.jdbcString = jdbcString;
- }
-
- /**
- * If no jdbc connection was specified, this
- * dir contains the h2 database. Otherwise, null.
- *
- * @return
- */
- public Path getTempDBDir() {
- return dbDir;
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
deleted file mode 100644
index b34f872..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncData.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.util.List;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-
-public class AsyncData extends EmitData {
-
- private final long taskId;
- private final FetchKey fetchKey;
- private final FetchEmitTuple.ON_PARSE_EXCEPTION onParseException;
-
- public AsyncData(@JsonProperty("taskId") long taskId,
- @JsonProperty("fetchKey") FetchKey fetchKey,
- @JsonProperty("emitKey") EmitKey emitKey, @JsonProperty("onParseException")
- FetchEmitTuple.ON_PARSE_EXCEPTION onParseException,
- @JsonProperty("metadataList") List<Metadata> metadataList) {
- super(emitKey, metadataList);
- this.taskId = taskId;
- this.fetchKey = fetchKey;
- this.onParseException = onParseException;
- }
-
- public FetchKey getFetchKey() {
- return fetchKey;
- }
-
- public long getTaskId() {
- return taskId;
- }
-
- public FetchEmitTuple.ON_PARSE_EXCEPTION getOnParseException() {
- return onParseException;
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
deleted file mode 100644
index 4c44f9c..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AsyncEmitter implements Callable<Integer> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitter.class);
-
-
- private final String connectionString;
- private final int emitterId;
- private final Path tikaConfigPath;
- private final Connection connection;
- private final PreparedStatement finished;
- private final PreparedStatement restarting;
-
- public AsyncEmitter(Connection connection, String connectionString, int emitterId,
- Path tikaConfigPath) throws SQLException {
- this.connectionString = connectionString;
- this.emitterId = emitterId;
- this.tikaConfigPath = tikaConfigPath;
- this.connection = connection;
- String sql = "update emitters set status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
- " where emitter_id = (" + emitterId + ")";
- finished = connection.prepareStatement(sql);
-
- sql = "update emitters set status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
- " where emitter_id = (" + emitterId + ")";
- restarting = connection.prepareStatement(sql);
- }
-
- @Override
- public Integer call() throws Exception {
- Process p = null;
- try {
- p = start();
- int restarts = 0;
- while (true) {
- boolean finished = p.waitFor(60, TimeUnit.SECONDS);
- if (finished) {
- int exitValue = p.exitValue();
- if (exitValue == 0) {
- LOG.debug("forked emitter process finished with exitValue=0");
- return 1;
- }
- reportCrash(++restarts, exitValue);
- p = start();
- }
- }
- } finally {
- if (p != null) {
- p.destroyForcibly();
- }
- finished.execute();
- }
- }
-
- private Process start() throws IOException {
- String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
- System.getProperty("java.class.path"),
- "org.apache.tika.pipes.async.AsyncEmitterProcess", Integer.toString(emitterId)};
- ProcessBuilder pb = new ProcessBuilder(args);
- pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
- pb.environment()
- .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
- pb.inheritIO();
- return pb.start();
- }
-
- private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
- LOG.warn("emitter id={} terminated, exitValue={}", emitterId, exitValue);
- restarting.execute();
- //should we unassign emit tasks here?
- }
-
- /*
- static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
- //if not, this is called to insert into the error log
- return connection.prepareStatement(
- "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
- " values (?,?,CURRENT_TIMESTAMP(),?,?)"
- );
- }
-
- static PreparedStatement prepareReset(Connection connection) throws SQLException {
- //and this is called to reset the status on error
- return connection.prepareStatement(
- "update task_queue set " +
- "status=?, " +
- "time_stamp=CURRENT_TIMESTAMP(), " +
- "retry=? " +
- "where id=?");
- }*/
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
deleted file mode 100644
index 03f5b93..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncEmitterProcess.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Paths;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
-import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-import org.apache.tika.utils.ExceptionUtils;
-
-public class AsyncEmitterProcess {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncEmitterProcess.class);
- private final LZ4FastDecompressor decompressor =
- LZ4Factory.fastestInstance().fastDecompressor();
- private final ObjectMapper objectMapper = new ObjectMapper();
- private final FutureTask<Integer> stdinWatcher;
- int recordsPerPulse = 10;
- //TODO -- parameterize these
- private final long emitWithinMs = 10000;
- private final long emitMaxBytes = 10_000_000;
- private PreparedStatement markForSelecting;
- private PreparedStatement selectForProcessing;
- private PreparedStatement emitStatusUpdate;
- private PreparedStatement checkForCanShutdown;
- private PreparedStatement checkForShouldShutdown;
- private PreparedStatement updateEmitterStatus;
-
- private AsyncEmitterProcess(InputStream stdin) {
- SimpleModule module = new SimpleModule();
- module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
- objectMapper.registerModule(module);
- stdinWatcher = new FutureTask<>(new ForkWatcher(stdin));
- new Thread(stdinWatcher).start();
- }
-
- public static void main(String[] args) throws Exception {
- String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
- TikaConfig tikaConfig =
- new TikaConfig(Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY)));
- int workerId = Integer.parseInt(args[0]);
- LOG.debug("trying to get connection {} >{}<", workerId, db);
- try (Connection connection = DriverManager.getConnection(db)) {
- AsyncEmitterProcess asyncEmitter = new AsyncEmitterProcess(System.in);
- asyncEmitter.execute(connection, workerId, tikaConfig);
- }
- System.exit(0);
- }
-
- private static void reportEmitStatus(List<Long> ids,
- AsyncWorkerProcess.TASK_STATUS_CODES emitted,
- PreparedStatement emitStatusUpdate) throws SQLException {
- for (long id : ids) {
- emitStatusUpdate.clearParameters();
- emitStatusUpdate.setByte(1, (byte) emitted.ordinal());
- emitStatusUpdate.setLong(2, id);
- emitStatusUpdate.addBatch();
- }
- emitStatusUpdate.executeBatch();
- }
-
- private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
- throws SQLException, InterruptedException {
- prepareStatements(connection, workerId);
- updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.ACTIVE.ordinal());
- EmitterManager emitterManager = tikaConfig.getEmitterManager();
- EmitDataCache emitDataCache =
- new EmitDataCache(emitterManager, emitMaxBytes, emitStatusUpdate);
- try {
- mainLoop(emitDataCache);
- } finally {
- emitDataCache.emitAll();
- updateStatus((byte) AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal());
- }
- }
-
- private void mainLoop(EmitDataCache emitDataCache) throws InterruptedException, SQLException {
-
- while (true) {
- if (shouldShutdown()) {
- LOG.debug("received should shutdown signal");
- return;
- }
- int toEmit = markForSelecting.executeUpdate();
- if (toEmit == 0 && canShutdown()) {
- //avoid race condition; double check there's nothing
- //left to emit
- toEmit = markForSelecting.executeUpdate();
- if (toEmit == 0) {
- LOG.debug("received can shutdown and didn't update any for selecting");
- return;
- }
- }
- if (toEmit > 0) {
- try {
- tryToEmitNextBatch(emitDataCache);
- } catch (IOException e) {
- LOG.warn("IOException trying to emit", e);
- }
- }
- if (emitDataCache.exceedsEmitWithin(emitWithinMs)) {
- emitDataCache.emitAll();
- }
- Thread.sleep(500);
- }
- }
-
- private void tryToEmitNextBatch(EmitDataCache emitDataCache) throws IOException, SQLException {
- List<AsyncEmitTuple> toEmitList = new ArrayList<>();
- try (ResultSet rs = selectForProcessing.executeQuery()) {
- while (rs.next()) {
- long id = rs.getLong(1);
- Timestamp ts = rs.getTimestamp(2);
- int uncompressedSize = rs.getInt(3);
- Blob blob = rs.getBlob(4);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- IOUtils.copyLarge(blob.getBinaryStream(), bos);
- byte[] bytes = bos.toByteArray();
- toEmitList.add(new AsyncEmitTuple(id, ts, uncompressedSize, bytes));
- }
- }
- List<Long> successes = new ArrayList<>();
- List<Long> exceptions = new ArrayList<>();
- for (AsyncEmitTuple tuple : toEmitList) {
- try {
- tryToEmit(tuple, emitDataCache);
- successes.add(tuple.id);
- } catch (IOException | SQLException e) {
- exceptions.add(tuple.id);
- }
- }
- reportEmitStatus(successes, AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS_EMIT,
- emitStatusUpdate);
- reportEmitStatus(exceptions, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
- emitStatusUpdate);
-
- }
-
- private void updateStatus(byte status) throws SQLException {
- updateEmitterStatus.clearParameters();
- updateEmitterStatus.setByte(1, status);
- updateEmitterStatus.executeUpdate();
- }
-
- private void tryToEmit(AsyncEmitTuple asyncEmitTuple, EmitDataCache emitDataCache)
- throws SQLException, IOException {
- AsyncData asyncData = deserialize(asyncEmitTuple.bytes, asyncEmitTuple.uncompressedSize);
- emitDataCache.add(asyncData);
- }
-
- boolean shouldShutdown() throws SQLException {
- if (stdinWatcher.isDone()) {
- LOG.info("parent inputstream closed; shutting down now");
- }
- try (ResultSet rs = checkForShouldShutdown.executeQuery()) {
- if (rs.next()) {
- int val = rs.getInt(1);
- return val > 0;
- }
- }
- return false;
- }
-
- boolean canShutdown() throws SQLException {
- try (ResultSet rs = checkForCanShutdown.executeQuery()) {
- if (rs.next()) {
- int val = rs.getInt(1);
- return val > 0;
- }
- }
- return false;
- }
-
- private void prepareStatements(Connection connection, int workerId) throws SQLException {
- String sql = "update task_queue set status=" +
- AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + ", worker_id=" +
- workerId + ", time_stamp=CURRENT_TIMESTAMP()" + " where id in " +
- " (select id from task_queue " + //where worker_id = " + workerId +
- " where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
- " order by time_stamp asc limit " + recordsPerPulse + " for update)";
- markForSelecting = connection.prepareStatement(sql);
-
- sql = "select q.id, q.time_stamp, uncompressed_size, bytes " + "from emits e " +
- "join task_queue q on e.id=q.id " + "where q.status=" +
- AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED_EMIT.ordinal() + " and worker_id=" +
- workerId + " order by time_stamp asc";
- selectForProcessing = connection.prepareStatement(sql);
-
- //only update the status if it is not already emitted or failed emit
- sql = "update task_queue set status=?" + ", time_stamp=CURRENT_TIMESTAMP()" +
- " where id=? and status not in (" +
- AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED.ordinal() + ", " +
- AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT.ordinal() + ")";
-
- emitStatusUpdate = connection.prepareStatement(sql);
-
- sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
- checkForCanShutdown = connection.prepareStatement(sql);
-
- sql = "select count(1) from emitters where emitter_id=" + workerId + " and status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
- checkForShouldShutdown = connection.prepareStatement(sql);
-
- sql = "merge into emitters key (emitter_id) " + "values (" + workerId + ", ? )";
- updateEmitterStatus = connection.prepareStatement(sql);
- }
-
- private AsyncData deserialize(byte[] compressed, int decompressedLength) throws IOException {
- byte[] restored = new byte[decompressedLength];
- int compressedLength2 =
- decompressor.decompress(compressed, 0, restored, 0, decompressedLength);
- return objectMapper.readerFor(AsyncData.class).readValue(restored);
- }
-
- private static class EmitDataCache {
- private final EmitterManager emitterManager;
- private final long maxBytes;
- private final PreparedStatement emitStatusUpdate;
- long estimatedSize = 0;
- int size = 0;
- Map<String, List<AsyncData>> map = new HashMap<>();
- private Instant lastAdded = Instant.now();
-
- public EmitDataCache(EmitterManager emitterManager, long maxBytes,
- PreparedStatement emitStatusUpdate) {
- this.emitterManager = emitterManager;
- this.maxBytes = maxBytes;
- this.emitStatusUpdate = emitStatusUpdate;
- }
-
- void updateEstimatedSize(long newBytes) {
- estimatedSize += newBytes;
- }
-
- void add(AsyncData data) {
-
- size++;
- long sz = AbstractEmitter
- .estimateSizeInBytes(data.getEmitKey().getEmitKey(), data.getMetadataList());
- if (estimatedSize + sz > maxBytes) {
- LOG.debug("estimated size ({}) > maxBytes({}), going to emitAll",
- (estimatedSize + sz), maxBytes);
- emitAll();
- }
- List<AsyncData> cached = map.get(data.getEmitKey().getEmitterName());
- if (cached == null) {
- cached = new ArrayList<>();
- map.put(data.getEmitKey().getEmitterName(), cached);
- }
- updateEstimatedSize(sz);
- cached.add(data);
- lastAdded = Instant.now();
- }
-
- private void emitAll() {
- int emitted = 0;
- LOG.debug("about to emit {}", size);
- for (Map.Entry<String, List<AsyncData>> e : map.entrySet()) {
- Emitter emitter = emitterManager.getEmitter(e.getKey());
-
- try {
- tryToEmit(emitter, e.getValue());
- } catch (SQLException ex) {
- ex.printStackTrace();
- }
- emitted += e.getValue().size();
- }
- LOG.debug("emitted: {}", emitted);
- estimatedSize = 0;
- size = 0;
- map.clear();
- }
-
- private long tryToEmit(Emitter emitter, List<AsyncData> cachedEmitData)
- throws SQLException {
- List<Long> ids = new ArrayList<>();
- for (AsyncData d : cachedEmitData) {
- ids.add(d.getTaskId());
- }
- try {
- emitter.emit(cachedEmitData);
- } catch (IOException | TikaEmitterException e) {
- LOG.warn("emitter class ({}): {}", emitter.getClass(),
- ExceptionUtils.getStackTrace(e));
- reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.FAILED_EMIT,
- emitStatusUpdate);
- }
- reportEmitStatus(ids, AsyncWorkerProcess.TASK_STATUS_CODES.EMITTED, emitStatusUpdate);
- return 1;
- }
-
-
- public boolean exceedsEmitWithin(long emitWithinMs) {
- return ChronoUnit.MILLIS.between(lastAdded, Instant.now()) > emitWithinMs;
- }
- }
-
- private static class ForkWatcher implements Callable<Integer> {
- private final InputStream in;
-
- public ForkWatcher(InputStream in) {
- this.in = in;
- }
-
- @Override
- public Integer call() throws Exception {
- //this should block forever
- //if the forking process dies,
- // this will either throw an IOException or read -1.
- try {
- int i = in.read();
- } finally {
- LOG.warn("forking process shutdown; exiting now");
- System.exit(0);
- }
- return 1;
- }
- }
-
- private static class AsyncEmitTuple {
- final long id;
- final Timestamp timestamp;
- final int uncompressedSize;
- final byte[] bytes;
-
- public AsyncEmitTuple(long id, Timestamp timestamp, int uncompressedSize, byte[] bytes) {
- this.id = id;
- this.timestamp = timestamp;
- this.uncompressedSize = uncompressedSize;
- this.bytes = bytes;
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
deleted file mode 100644
index 79ed80d..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncPipesEmitHook.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AsyncPipesEmitHook implements AsyncEmitHook {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncPipesEmitHook.class);
-
- private final PreparedStatement markSuccess;
- private final PreparedStatement markFailure;
-
- public AsyncPipesEmitHook(Connection connection) throws SQLException {
- String sql = "delete from task_queue where id=?";
- markSuccess = connection.prepareStatement(sql);
- //TODO --fix this
- markFailure = connection.prepareStatement(sql);
- }
-
- @Override
- public void onSuccess(AsyncTask task) {
- try {
- markSuccess.clearParameters();
- markSuccess.setLong(1, task.getTaskId());
- markSuccess.execute();
- } catch (SQLException e) {
- LOG.warn("problem with on success: " + task.getTaskId(), e);
- }
- }
-
- @Override
- public void onFail(AsyncTask task) {
- try {
- markFailure.clearParameters();
- markFailure.setLong(1, task.getTaskId());
- markFailure.execute();
- } catch (SQLException e) {
- LOG.warn("problem with on fail: " + task.getTaskId(), e);
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
deleted file mode 100644
index c718a48..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-
-public class AsyncProcessor implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncProcessor.class);
- protected static String TIKA_ASYNC_JDBC_KEY = "TIKA_ASYC_JDBC_KEY";
- protected static String TIKA_ASYNC_CONFIG_FILE_KEY = "TIKA_ASYNC_CONFIG_FILE_KEY";
- private final Path tikaConfigPath;
- private final ArrayBlockingQueue<FetchEmitTuple> queue;
- private final Connection connection;
- private final int totalThreads;
- private final AsyncConfig asyncConfig;
- private PreparedStatement emittersCanShutdown;
- private volatile boolean isShuttingDown = false;
- private AssignmentManager assignmentManager;
- private int finishedThreads = 0;
- private ExecutorService executorService;
- private ExecutorCompletionService<Integer> executorCompletionService;
-
- private AsyncProcessor(Path tikaConfigPath) throws SQLException, IOException {
- this.tikaConfigPath = tikaConfigPath;
- this.asyncConfig = AsyncConfig.load(tikaConfigPath);
- this.queue = new ArrayBlockingQueue<>(asyncConfig.getQueueSize());
- this.connection = DriverManager.getConnection(asyncConfig.getJdbcString());
- this.totalThreads = asyncConfig.getNumWorkers() + asyncConfig.getNumEmitters() +
- 2;//assignment manager and enqueuer threads
- }
-
- public static AsyncProcessor build(Path tikaConfigPath) throws AsyncRuntimeException {
- try {
- AsyncProcessor processor = new AsyncProcessor(tikaConfigPath);
- processor.init();
- return processor;
- } catch (SQLException | IOException e) {
- throw new AsyncRuntimeException(e);
- }
- }
-
- private static List<Integer> getActiveWorkers(Connection connection) throws SQLException {
- PreparedStatement findActiveWorkers =
- connection.prepareStatement("select worker_id from workers");
- List<Integer> workers = new ArrayList<>();
- try (ResultSet rs = findActiveWorkers.executeQuery()) {
- while (rs.next()) {
- workers.add(rs.getInt(1));
- }
- }
- return workers;
- }
-
- public synchronized boolean offer(List<FetchEmitTuple> fetchEmitTuples, long offerMs)
- throws AsyncRuntimeException, InterruptedException {
- if (queue == null) {
- throw new IllegalStateException("queue hasn't been initialized yet.");
- } else if (isShuttingDown) {
- throw new IllegalStateException(
- "Can't call offer after calling close() or " + "shutdownNow()");
- }
- long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- while (elapsed < offerMs) {
- checkActive();
- if (queue.remainingCapacity() > fetchEmitTuples.size()) {
- try {
- queue.addAll(fetchEmitTuples);
- return true;
- } catch (IllegalStateException e) {
- //swallow
- }
- }
- Thread.sleep(100);
- elapsed = System.currentTimeMillis() - start;
- }
- return false;
- }
-
- public synchronized boolean offer(FetchEmitTuple t, long offerMs)
- throws AsyncRuntimeException, InterruptedException {
- if (queue == null) {
- throw new IllegalStateException("queue hasn't been initialized yet.");
- } else if (isShuttingDown) {
- throw new IllegalStateException(
- "Can't call offer after calling close() or " + "shutdownNow()");
- }
- checkActive();
- return queue.offer(t, offerMs, TimeUnit.MILLISECONDS);
- }
-
- /**
- * This polls the executorcompletionservice to check for execution exceptions
- * and to make sure that some threads are still active.
- *
- * @return
- * @throws AsyncRuntimeException
- * @throws InterruptedException
- */
- public synchronized boolean checkActive() throws AsyncRuntimeException, InterruptedException {
- Future<Integer> future = executorCompletionService.poll();
- if (future != null) {
- try {
- future.get();
- } catch (ExecutionException e) {
- throw new AsyncRuntimeException(e);
- }
- finishedThreads++;
- }
- return finishedThreads != totalThreads;
- }
-
- private void init() throws SQLException {
-
- setupTables();
- String sql = "update emitters set status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.CAN_SHUTDOWN.ordinal();
- this.emittersCanShutdown = connection.prepareStatement(sql);
- executorService = Executors.newFixedThreadPool(totalThreads);
- executorCompletionService = new ExecutorCompletionService<>(executorService);
-
- AsyncTaskEnqueuer taskEnqueuer = new AsyncTaskEnqueuer(queue, connection);
-
- executorCompletionService.submit(taskEnqueuer);
-
- List<AsyncWorker> workers = buildWorkers(connection, asyncConfig, tikaConfigPath);
- int maxRetries = 0;
- for (AsyncWorker worker : workers) {
- if (worker.getMaxRetries() > maxRetries) {
- maxRetries = worker.getMaxRetries();
- }
- executorCompletionService.submit(worker);
- }
- assignmentManager = new AssignmentManager(connection, taskEnqueuer, maxRetries);
- executorCompletionService.submit(assignmentManager);
- for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
- executorCompletionService
- .submit(new AsyncEmitter(connection, asyncConfig.getJdbcString(),
- asyncConfig.getNumWorkers() + i, tikaConfigPath));
- }
- }
-
- private List<AsyncWorker> buildWorkers(Connection connection, AsyncConfig asyncConfig,
- Path tikaConfigPath) throws SQLException {
- //TODO -- make these workers configurable via the tika config, e.g. max retries
- //and jvm args, etc.
- List<AsyncWorker> workers = new ArrayList<>();
- for (int i = 0; i < asyncConfig.getNumWorkers(); i++) {
- workers.add(
- new AsyncWorker(connection, asyncConfig.getJdbcString(), i, tikaConfigPath));
- }
- return workers;
- }
-
- private void setupTables() throws SQLException {
-
- String sql = "create table task_queue " + "(id bigint auto_increment primary key," +
- "status tinyint," + //byte
- "worker_id integer," + "retry smallint," + //short
- "time_stamp timestamp," +
- "json varchar(64000))";//this is the AsyncTask ... not the emit data!
- try (Statement st = connection.createStatement()) {
- st.execute(sql);
- }
- sql = "create table workers (worker_id int primary key, status tinyint)";
- try (Statement st = connection.createStatement()) {
- st.execute(sql);
- }
-
- sql = "create table emitters (emitter_id int primary key, status tinyint)";
- try (Statement st = connection.createStatement()) {
- st.execute(sql);
- }
-
- sql = "create table error_log (task_id bigint, " + "fetch_key varchar(10000)," +
- "time_stamp timestamp," + "retry integer," + "error_code tinyint)";
-
- try (Statement st = connection.createStatement()) {
- st.execute(sql);
- }
-
- sql = "create table emits (" + "id bigint primary key, " + "time_stamp timestamp, " +
- "uncompressed_size bigint, " + "bytes blob)";
- try (Statement st = connection.createStatement()) {
- st.execute(sql);
- }
- }
-
- public void shutdownNow() throws IOException, AsyncRuntimeException {
- isShuttingDown = true;
- try {
- executorService.shutdownNow();
- } finally {
- //close down processes and db
- if (asyncConfig.getTempDBDir() != null) {
- FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
- }
- }
- }
-
- /**
- * This is a blocking close. It will wait for all tasks successfully submitted before this
- * call to close() to complete before closing. If you need to shutdown immediately, try
- * {@link #shutdownNow()}.
- *
- * @throws IOException
- */
- @Override
- public void close() throws IOException {
- isShuttingDown = true;
- try {
- completeAndShutdown();
- } catch (SQLException | InterruptedException e) {
- throw new IOException(e);
- } finally {
- executorService.shutdownNow();
- SQLException ex = null;
- try {
- connection.close();
- } catch (SQLException e) {
- ex = e;
- }
- //close down processes and db
- if (asyncConfig.getTempDBDir() != null) {
- FileUtils.deleteDirectory(asyncConfig.getTempDBDir().toFile());
- }
- if (ex != null) {
- throw new IOException(ex);
- }
- }
- }
-
- //this will block until everything finishes
- private void completeAndShutdown() throws SQLException, InterruptedException {
-
- //blocking...notify taskEnqueuer
- queue.put(FetchIterator.COMPLETED_SEMAPHORE);
-
- //wait for assignmentManager to finish
- //it will only complete after the task enqueuer has completed
- //and there are no more parse tasks available, selected or in process
- while (!assignmentManager.hasCompletedTasks()) {
- Thread.sleep(100);
- }
-
- emittersCanShutdown.executeUpdate();
-
- //wait for emitters to finish
- long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- try {
- boolean isActive = checkActive();
- while (isActive) {
- isActive = checkActive();
- elapsed = System.currentTimeMillis();
- }
- } catch (InterruptedException e) {
- return;
- }
- }
-
- //this reads fetchemittuples from the queue and inserts them in the db
- //for the workers to read
- private static class AsyncTaskEnqueuer implements Callable<Integer> {
- private final PreparedStatement insert;
-
- private final ArrayBlockingQueue<FetchEmitTuple> queue;
- private final Connection connection;
- private final Random random = new Random();
-
- private volatile boolean isComplete = false;
-
- AsyncTaskEnqueuer(ArrayBlockingQueue<FetchEmitTuple> queue, Connection connection)
- throws SQLException {
- this.queue = queue;
- this.connection = connection;
- String sql = "insert into task_queue (status, time_stamp, worker_id, retry, json) " +
- "values (?,CURRENT_TIMESTAMP(),?,?,?)";
- insert = connection.prepareStatement(sql);
- }
-
- @Override
- public Integer call() throws Exception {
- List<Integer> workers = new ArrayList<>();
- while (true) {
- FetchEmitTuple t = queue.poll(1, TimeUnit.SECONDS);
- LOG.debug("enqueing to db " + t);
- if (t == null) {
- //log.trace?
- } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
- isComplete = true;
- return 1;
- } else {
- long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- //TODO -- fix this -- this loop waits for workers to register
- while (workers.size() == 0 && elapsed < 600000) {
- workers = getActiveWorkers(connection);
- Thread.sleep(100);
- elapsed = System.currentTimeMillis() - start;
- }
- insert(t, workers);
- }
- }
- }
-
- boolean isComplete() {
- return isComplete;
- }
-
- private void insert(FetchEmitTuple t, List<Integer> workers)
- throws IOException, SQLException {
- int workerId = workers.size() == 1 ? workers.get(0) :
- workers.get(random.nextInt(workers.size()));
- insert.clearParameters();
- insert.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
- insert.setInt(2, workerId);
- insert.setShort(3, (short) 0);
- insert.setString(4, JsonFetchEmitTuple.toJson(t));
- insert.execute();
- }
- }
-
- private static class AssignmentManager implements Callable {
-
- private final Connection connection;
- private final AsyncTaskEnqueuer enqueuer;
- private final PreparedStatement getQueueDistribution;
- private final PreparedStatement findMissingWorkers;
- private final PreparedStatement allocateNonworkersToWorkers;
- private final PreparedStatement reallocate;
- private final PreparedStatement countAvailableTasks;
- private final PreparedStatement shutdownWorker;
- private final PreparedStatement findMaxRetrieds;
- private final PreparedStatement logMaxRetrieds;
- private final PreparedStatement removeMaxRetrieds;
- private final Random random = new Random();
- private final int maxRetries;
- private volatile boolean hasCompleted = false;
-
-
- public AssignmentManager(Connection connection, AsyncTaskEnqueuer enqueuer, int maxRetries)
- throws SQLException {
- this.connection = connection;
- this.enqueuer = enqueuer;
- this.maxRetries = maxRetries;
- //this gets workers and # of tasks in desc order of number of tasks
- String sql = "select w.worker_id, p.cnt " + "from workers w " +
- "left join (select worker_id, count(1) as cnt from task_queue " +
- "where status=" + AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() +
- " group by worker_id)" + " p on p.worker_id=w.worker_id order by p.cnt desc";
- getQueueDistribution = connection.prepareStatement(sql);
- //find workers that have assigned tasks but are not in the
- //workers table
- sql = "select p.worker_id, count(1) as cnt from task_queue p " +
- "left join workers w on p.worker_id=w.worker_id " +
- "where w.worker_id is null group by p.worker_id";
- findMissingWorkers = connection.prepareStatement(sql);
-
- sql = "update task_queue set worker_id=? where worker_id=?";
- allocateNonworkersToWorkers = connection.prepareStatement(sql);
-
- //current strategy reallocate tasks from longest queue to shortest
- //TODO: might consider randomly shuffling or other algorithms
- sql = "update task_queue set worker_id= ? where id in " +
- "(select id from task_queue where " + "worker_id = ? and " + "rand() < 0.8 " +
- "and status=0 for update)";
- reallocate = connection.prepareStatement(sql);
-
- //get those tasks that are in the parse phase
- //if they are selected or in process, it is possible that
- //they'll need to be retried. So, include all statuses
- //meaning that the parse has not completed.
- sql = "select count(1) from task_queue where status in (" +
- AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal() + ", " +
- AsyncWorkerProcess.TASK_STATUS_CODES.SELECTED.ordinal() + ", " +
- AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal() + ")";
- countAvailableTasks = connection.prepareStatement(sql);
-
- sql = "update workers set status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal() +
- " where worker_id = ?";
- shutdownWorker = connection.prepareStatement(sql);
-
- sql = "select id, retry, json from task_queue where retry >=" + maxRetries;
- findMaxRetrieds = connection.prepareStatement(sql);
-
- sql = "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code)" +
- "values (?,?,CURRENT_TIMESTAMP(), ?," +
- AsyncWorkerProcess.ERROR_CODES.MAX_RETRIES.ordinal() + ")";
- logMaxRetrieds = connection.prepareStatement(sql);
-
- sql = "delete from task_queue where id=?";
- removeMaxRetrieds = connection.prepareStatement(sql);
- }
-
- protected boolean hasCompletedTasks() {
- return hasCompleted;
- }
-
- @Override
- public Integer call() throws Exception {
-
- while (true) {
- removeMaxRetrieds();
- List<Integer> missingWorkers = getMissingWorkers();
- reallocateFromMissingWorkers(missingWorkers);
- redistribute();
- if (isComplete()) {
- notifyWorkers();
- return 1;
- }
- Thread.sleep(200);
- }
- }
-
- private void removeMaxRetrieds() throws SQLException {
- Set<Long> toRemove = new HashSet<>();
- try (ResultSet rs = findMaxRetrieds.executeQuery()) {
- while (rs.next()) {
- long id = rs.getLong(1);
- String json = rs.getString(2);
- int retries = rs.getInt(3);
- toRemove.add(id);
- FetchEmitTuple t;
- try (Reader reader = new StringReader(json)) {
- t = JsonFetchEmitTuple.fromJson(reader);
- } catch (Exception e) {
- e.printStackTrace();
- //need to log this in the error_logs table
- continue;
- }
- logMaxRetrieds.clearParameters();
- logMaxRetrieds.setLong(1, id);
- logMaxRetrieds.setString(2, t.getFetchKey().getFetchKey());
- logMaxRetrieds.setInt(3, retries);
- logMaxRetrieds.addBatch();
- }
- }
- logMaxRetrieds.executeBatch();
-
- for (Long id : toRemove) {
- removeMaxRetrieds.clearParameters();
- removeMaxRetrieds.setLong(1, id);
- removeMaxRetrieds.addBatch();
- }
- removeMaxRetrieds.executeBatch();
- }
-
- private void notifyWorkers() throws SQLException {
- for (int workerId : getActiveWorkers(connection)) {
- shutdownWorker.clearParameters();
- shutdownWorker.setInt(1, workerId);
- shutdownWorker.execute();
- }
- }
-
- private boolean isComplete() throws SQLException {
- if (hasCompleted) {
- return hasCompleted;
- }
- if (!enqueuer.isComplete) {
- return false;
- }
- try (ResultSet rs = countAvailableTasks.executeQuery()) {
- while (rs.next()) {
- int availTasks = rs.getInt(1);
- if (availTasks == 0) {
- hasCompleted = true;
- return true;
- }
- }
- }
- return false;
- }
-
- private void redistribute() throws SQLException {
- //parallel lists of workerid = task queue size
- List<Integer> workerIds = new ArrayList<>();
- List<Integer> queueSize = new ArrayList<>();
- int totalTasks = 0;
-
- try (ResultSet rs = getQueueDistribution.executeQuery()) {
- while (rs.next()) {
- int workerId = rs.getInt(1);
- int numTasks = rs.getInt(2);
- workerIds.add(workerId);
- queueSize.add(numTasks);
- LOG.debug("workerId: ({}) numTasks: ({})", workerId, numTasks);
- totalTasks += numTasks;
- }
- }
- if (workerIds.size() == 0) {
- return;
- }
- int averagePerWorker = Math.round((float) totalTasks / (float) workerIds.size());
- int midPoint = Math.round((float) queueSize.size() / 2) + 1;
- for (int i = queueSize.size() - 1, j = 0; i > midPoint && j < midPoint; i--, j++) {
- int shortestQueue = queueSize.get(i);
- int longestQueue = queueSize.get(j);
- if ((shortestQueue < 5 && longestQueue > 5) ||
- longestQueue > 5 && longestQueue > (int) (1.5 * averagePerWorker)) {
- int shortestQueueWorker = workerIds.get(i);
- int longestQueueWorker = workerIds.get(j);
- reallocate.clearParameters();
- reallocate.setLong(1, shortestQueueWorker);
- reallocate.setLong(2, longestQueueWorker);
- reallocate.execute();
- }
- }
-
- }
-
- private void reallocateFromMissingWorkers(List<Integer> missingWorkers)
- throws SQLException {
-
- if (missingWorkers.size() == 0) {
- return;
- }
-
- List<Integer> activeWorkers = getActiveWorkers(connection);
- if (activeWorkers.size() == 0) {
- return;
- }
-
- for (int missing : missingWorkers) {
- int active = activeWorkers.get(random.nextInt(activeWorkers.size()));
- allocateNonworkersToWorkers.clearParameters();
- allocateNonworkersToWorkers.setInt(1, active);
- allocateNonworkersToWorkers.setInt(2, missing);
- allocateNonworkersToWorkers.execute();
- LOG.debug("allocating missing working ({}) to ({})", missing, active);
- }
- }
-
- private List<Integer> getMissingWorkers() throws SQLException {
- List<Integer> missingWorkers = new ArrayList<>();
- try (ResultSet rs = findMissingWorkers.executeQuery()) {
- while (rs.next()) {
- int workerId = rs.getInt(1);
- missingWorkers.add(workerId);
- LOG.debug("Worker ({}) no longer active", workerId);
- }
- }
- return missingWorkers;
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
deleted file mode 100644
index d9cca42..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncTask.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.tika.pipes.FetchEmitTuple;
-
-public class AsyncTask extends FetchEmitTuple {
-
- public static final AsyncTask SHUTDOWN_SEMAPHORE =
- new AsyncTask(-1, (short) -1, new FetchEmitTuple(null, null, null));
- private final short retry;
- private long taskId;
-
- public AsyncTask(@JsonProperty("taskId") long taskId, @JsonProperty("retry") short retry,
- @JsonProperty("fetchEmitTuple") FetchEmitTuple fetchEmitTuple) {
- super(fetchEmitTuple.getFetchKey(), fetchEmitTuple.getEmitKey(),
- fetchEmitTuple.getMetadata());
- this.taskId = taskId;
- this.retry = retry;
- }
-
- public long getTaskId() {
- return taskId;
- }
-
- public void setTaskId(long taskId) {
- this.taskId = taskId;
- }
-
- public short getRetry() {
- return retry;
- }
-
- @Override
- public String toString() {
- return "AsyncTask{" + "taskId=" + taskId + ", retry=" + retry + '}';
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
deleted file mode 100644
index ee3882b..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorker.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY;
-import static org.apache.tika.pipes.async.AsyncProcessor.TIKA_ASYNC_JDBC_KEY;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.nio.file.Path;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.pipes.FetchEmitTuple;
-
-/**
- * This controls monitoring of the AsyncWorkerProcess
- * and updates to the db on crashes etc.
- */
-public class AsyncWorker implements Callable<Integer> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncWorker.class);
-
-
- private final String connectionString;
- private final int workerId;
- private final Path tikaConfigPath;
- private final Connection connection;
- private final PreparedStatement finished;
- private final PreparedStatement restarting;
- private final PreparedStatement selectActiveTasks;
- private final PreparedStatement insertErrorLog;
- private final PreparedStatement resetStatus;
- //TODO: make this configurable
- private final int maxRetries = 2;
-
- public AsyncWorker(Connection connection, String connectionString, int workerId,
- Path tikaConfigPath) throws SQLException {
- this.connectionString = connectionString;
- this.workerId = workerId;
- this.tikaConfigPath = tikaConfigPath;
- this.connection = connection;
- String sql = "update workers set status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.HAS_SHUTDOWN.ordinal() +
- " where worker_id = (" + workerId + ")";
- finished = connection.prepareStatement(sql);
-
- sql = "update workers set status=" +
- AsyncWorkerProcess.WORKER_STATUS_CODES.RESTARTING.ordinal() +
- " where worker_id = (" + workerId + ")";
- restarting = connection.prepareStatement(sql);
- //this checks if the process was able to reset the status
- sql = "select id, retry, json from task_queue where worker_id=" + workerId +
- " and status=" + AsyncWorkerProcess.TASK_STATUS_CODES.IN_PROCESS.ordinal();
- selectActiveTasks = connection.prepareStatement(sql);
-
- //if not, this is called to insert into the error log
- insertErrorLog = prepareInsertErrorLog(connection);
-
- //and this is called to reset the status on error
- resetStatus = prepareReset(connection);
- }
-
- static void reportAndReset(AsyncTask task, AsyncWorkerProcess.ERROR_CODES errorCode,
- PreparedStatement insertErrorLog, PreparedStatement resetStatus,
- Logger logger) {
- try {
- insertErrorLog.clearParameters();
- insertErrorLog.setLong(1, task.getTaskId());
- insertErrorLog.setString(2, task.getFetchKey().getFetchKey());
- insertErrorLog.setInt(3, task.getRetry());
- insertErrorLog.setByte(4, (byte) errorCode.ordinal());
- insertErrorLog.execute();
- } catch (SQLException e) {
- logger.error("Can't update error log", e);
- }
-
- try {
- resetStatus.clearParameters();
- resetStatus.setByte(1, (byte) AsyncWorkerProcess.TASK_STATUS_CODES.AVAILABLE.ordinal());
- resetStatus.setShort(2, (short) (task.getRetry() + 1));
- resetStatus.setLong(3, task.getTaskId());
- resetStatus.execute();
- } catch (SQLException e) {
- logger.error("Can't reset try status", e);
- }
- }
-
- static PreparedStatement prepareInsertErrorLog(Connection connection) throws SQLException {
- //if not, this is called to insert into the error log
- return connection.prepareStatement(
- "insert into error_log (task_id, fetch_key, time_stamp, retry, error_code) " +
- " values (?,?,CURRENT_TIMESTAMP(),?,?)");
- }
-
- static PreparedStatement prepareReset(Connection connection) throws SQLException {
- //and this is called to reset the status on error
- return connection.prepareStatement(
- "update task_queue set " + "status=?, " + "time_stamp=CURRENT_TIMESTAMP(), " +
- "retry=? " + "where id=?");
- }
-
- @Override
- public Integer call() throws Exception {
- Process p = null;
- try {
- p = start();
- int restarts = 0;
- while (true) {
- boolean finished = p.waitFor(60, TimeUnit.SECONDS);
- if (finished) {
- int exitValue = p.exitValue();
- if (exitValue == 0) {
- LOG.debug("forked worker process finished with exitValue=0");
- return 1;
- }
- reportCrash(++restarts, exitValue);
- p = start();
- }
- }
- } finally {
- if (p != null) {
- p.destroyForcibly();
- }
- finished.execute();
- }
- }
-
- public int getMaxRetries() {
- return maxRetries;
- }
-
- private Process start() throws IOException {
- String[] args = new String[]{"java", "-Djava.awt.headless=true", "-cp",
- System.getProperty("java.class.path"),
- "org.apache.tika.pipes.async.AsyncWorkerProcess", Integer.toString(workerId)};
- ProcessBuilder pb = new ProcessBuilder(args);
- pb.environment().put(TIKA_ASYNC_JDBC_KEY, connectionString);
- pb.environment()
- .put(TIKA_ASYNC_CONFIG_FILE_KEY, tikaConfigPath.toAbsolutePath().toString());
- pb.inheritIO();
- return pb.start();
- }
-
- private void reportCrash(int numRestarts, int exitValue) throws SQLException, IOException {
- LOG.warn("worker id={} terminated, exitValue={}", workerId, exitValue);
- restarting.execute();
- List<AsyncTask> activeTasks = new ArrayList<>();
- try (ResultSet rs = selectActiveTasks.executeQuery()) {
- long taskId = rs.getLong(1);
- short retry = rs.getShort(2);
- String json = rs.getString(3);
- FetchEmitTuple tuple = JsonFetchEmitTuple.fromJson(new StringReader(json));
- activeTasks.add(new AsyncTask(taskId, retry, tuple));
- }
- if (activeTasks.size() == 0) {
- LOG.debug("worker reset active tasks, nothing extra to report");
- return;
- }
-
- if (activeTasks.size() > 1) {
- LOG.warn("more than one active task? this should never happen!");
- }
-
- for (AsyncTask t : activeTasks) {
- reportAndReset(t, AsyncWorkerProcess.ERROR_CODES.UNKNOWN_PARSE, insertErrorLog,
- resetStatus, LOG);
- }
-
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java b/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
deleted file mode 100644
index 43f96df..0000000
--- a/tika-pipes/tika-pipes-async/src/main/java/org/apache/tika/pipes/async/AsyncWorkerProcess.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import static org.apache.tika.pipes.async.AsyncTask.SHUTDOWN_SEMAPHORE;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareInsertErrorLog;
-import static org.apache.tika.pipes.async.AsyncWorker.prepareReset;
-import static org.apache.tika.pipes.async.AsyncWorker.reportAndReset;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import net.jpountz.lz4.LZ4Factory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.EncryptedDocumentException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
-import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.apache.tika.parser.RecursiveParserWrapper;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.sax.BasicContentHandlerFactory;
-import org.apache.tika.sax.RecursiveParserWrapperHandler;
-import org.apache.tika.utils.StringUtils;
-
-public class AsyncWorkerProcess {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncWorkerProcess.class);
- //make these all configurable
- private static final long SHUTDOWN_AFTER_MS = 120000;
- private static final long PULSE_MS = 1000;
- private final long parseTimeoutMs = 60000;
-
- public static void main(String[] args) throws Exception {
- Path tikaConfigPath = Paths.get(System.getenv(AsyncProcessor.TIKA_ASYNC_CONFIG_FILE_KEY));
- String db = System.getenv(AsyncProcessor.TIKA_ASYNC_JDBC_KEY);
- TikaConfig tikaConfig = new TikaConfig(tikaConfigPath);
- int workerId = Integer.parseInt(args[0]);
- LOG.debug("trying to get connection {} >{}<", workerId, db);
- try (Connection connection = DriverManager.getConnection(db)) {
- AsyncWorkerProcess asyncWorker = new AsyncWorkerProcess();
- asyncWorker.execute(connection, workerId, tikaConfig);
- }
- System.exit(0);
- }
-
- private void execute(Connection connection, int workerId, TikaConfig tikaConfig)
- throws SQLException {
- //3 = worker + forkwatcher + active task
- ExecutorService service = Executors.newFixedThreadPool(3);
- ExecutorCompletionService<Integer> executorCompletionService =
- new ExecutorCompletionService<>(service);
-
- executorCompletionService
- .submit(new Worker(connection, workerId, tikaConfig, parseTimeoutMs));
- executorCompletionService.submit(new ForkWatcher(System.in));
-
- int completed = 0;
-
- //if either one stops, we need to stop
- try {
- while (completed < 1) {
- Future<Integer> future = executorCompletionService.poll(60, TimeUnit.SECONDS);
- if (future != null) {
- completed++;
- future.get();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- LOG.error("worker " + workerId + " had a mainloop exception", e);
- } finally {
- service.shutdownNow();
- }
- return;
- }
-
- enum TASK_STATUS_CODES {
- AVAILABLE, SELECTED, IN_PROCESS, AVAILABLE_EMIT, SELECTED_EMIT, IN_PROCESS_EMIT,
- FAILED_EMIT, EMITTED
- }
-
- public enum WORKER_STATUS_CODES {
- ACTIVE, RESTARTING, HIBERNATING, CAN_SHUTDOWN,//if there's nothing else to process, shutdown
- SHOULD_SHUTDOWN, //shutdown now whether or not there's anything else to process
- HAS_SHUTDOWN
- }
-
- enum ERROR_CODES {
- TIMEOUT, SECURITY_EXCEPTION, OTHER_EXCEPTION, OOM, OTHER_ERROR, UNKNOWN_PARSE, MAX_RETRIES,
- EMIT_SERIALIZATION, EMIT_SQL_INSERT_EXCEPTION, EMIT_SQL_SELECT_EXCEPTION,
- EMIT_DESERIALIZATION, EMIT_EXCEPTION
- }
-
- private static class TaskQueue {
- private final Connection connection;
- private final int workerId;
-
- private final PreparedStatement markForSelecting;
- private final PreparedStatement selectForProcessing;
- private final PreparedStatement markForProcessing;
- private final PreparedStatement checkForShutdown;
-
-
- TaskQueue(Connection connection, int workerId) throws SQLException {
- this.connection = connection;
- this.workerId = workerId;
- //TODO -- need to update timestamp
- String sql = "update task_queue set status=" + TASK_STATUS_CODES.SELECTED.ordinal() +
- ", time_stamp=CURRENT_TIMESTAMP()" + " where id = " +
- " (select id from task_queue where worker_id = " + workerId + " and status=" +
- TASK_STATUS_CODES.AVAILABLE.ordinal() +
- " order by time_stamp asc limit 1 for update)";
- markForSelecting = connection.prepareStatement(sql);
- sql = "select id, retry, json from task_queue where status=" +
- TASK_STATUS_CODES.SELECTED.ordinal() + " and " + " worker_id=" + workerId +
- " order by time_stamp asc limit 1";
- selectForProcessing = connection.prepareStatement(sql);
- sql = "update task_queue set status=" + TASK_STATUS_CODES.IN_PROCESS.ordinal() +
- ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
- markForProcessing = connection.prepareStatement(sql);
-
- sql = "select count(1) from workers where worker_id=" + workerId + " and status=" +
- WORKER_STATUS_CODES.SHOULD_SHUTDOWN.ordinal();
- checkForShutdown = connection.prepareStatement(sql);
- }
-
- AsyncTask poll(long pollMs) throws InterruptedException, IOException, SQLException {
- long start = System.currentTimeMillis();
- long elapsed = System.currentTimeMillis() - start;
- while (elapsed < pollMs) {
- if (shouldShutdown()) {
- return SHUTDOWN_SEMAPHORE;
- }
- int i = markForSelecting.executeUpdate();
- if (i == 0) {
-// debugQueue();
- Thread.sleep(PULSE_MS);
- } else {
- long taskId = -1;
- short retry = -1;
- String json = "";
- try (ResultSet rs = selectForProcessing.executeQuery()) {
- while (rs.next()) {
- taskId = rs.getLong(1);
- retry = rs.getShort(2);
- json = rs.getString(3);
- }
- }
- markForProcessing.clearParameters();
- markForProcessing.setLong(1, taskId);
- markForProcessing.execute();
-
- FetchEmitTuple t = null;
- try (Reader reader = new StringReader(json)) {
- t = JsonFetchEmitTuple.fromJson(reader);
- }
- AsyncTask task = new AsyncTask(taskId, retry, t);
- return task;
- }
- elapsed = System.currentTimeMillis() - start;
- }
- return null;
- }
-
- private void debugQueue() throws SQLException {
- try (ResultSet rs = connection.createStatement()
- .executeQuery("select id, status, worker_id from task_queue limit 10")) {
- while (rs.next()) {
- System.out.println(
- "id: " + rs.getInt(1) + " status: " + rs.getInt(2) + " worker_id: " +
- rs.getInt(3));
- }
- }
- }
-
- boolean shouldShutdown() throws SQLException {
- try (ResultSet rs = checkForShutdown.executeQuery()) {
- if (rs.next()) {
- int val = rs.getInt(1);
- return val > 0;
- }
- }
- return false;
- }
- }
-
-
- private static class Worker implements Callable<Integer> {
-
- private final Connection connection;
- private final int workerId;
- private final RecursiveParserWrapper parser;
- private final TikaConfig tikaConfig;
- private final long parseTimeoutMs;
- private final PreparedStatement insertErrorLog;
- private final PreparedStatement resetStatus;
- private final PreparedStatement insertEmitData;
- private final PreparedStatement updateStatusForEmit;
- private final ObjectMapper objectMapper = new ObjectMapper();
- LZ4Factory factory = LZ4Factory.fastestInstance();
- private final ExecutorService executorService;
- private final ExecutorCompletionService<AsyncData> executorCompletionService;
-
- public Worker(Connection connection, int workerId, TikaConfig tikaConfig,
- long parseTimeoutMs) throws SQLException {
- this.connection = connection;
- this.workerId = workerId;
- this.parser = new RecursiveParserWrapper(tikaConfig.getParser());
- this.tikaConfig = tikaConfig;
- this.executorService = Executors.newFixedThreadPool(1);
- this.executorCompletionService = new ExecutorCompletionService<>(executorService);
- this.parseTimeoutMs = parseTimeoutMs;
-
- SimpleModule module = new SimpleModule();
- module.addSerializer(Metadata.class, new JsonMetadataSerializer());
- objectMapper.registerModule(module);
- String sql = "merge into workers key (worker_id) " + "values (" + workerId + ", " +
- WORKER_STATUS_CODES.ACTIVE.ordinal() + ")";
- connection.createStatement().execute(sql);
- insertErrorLog = prepareInsertErrorLog(connection);
- resetStatus = prepareReset(connection);
- insertEmitData = prepareInsertEmitData(connection);
- sql = "update task_queue set status=" + TASK_STATUS_CODES.AVAILABLE_EMIT.ordinal() +
- ", time_stamp=CURRENT_TIMESTAMP()" + " where id=?";
- updateStatusForEmit = connection.prepareStatement(sql);
- }
-
- static PreparedStatement prepareInsertEmitData(Connection connection) throws SQLException {
- return connection.prepareStatement(
- "insert into emits (id, time_stamp, uncompressed_size, bytes) " +
- " values (?,CURRENT_TIMESTAMP(),?,?)");
- }
-
- public Integer call() throws Exception {
- AsyncTask task = null;
- try {
-
- TaskQueue queue = new TaskQueue(connection, workerId);
-
- long lastProcessed = System.currentTimeMillis();
-
- while (true) {
-
- task = queue.poll(1000);
- if (task == null) {
- long elapsed = System.currentTimeMillis() - lastProcessed;
- if (elapsed > SHUTDOWN_AFTER_MS) {
- LOG.debug("shutting down after no assignments in {}ms", elapsed);
- return 1;
- }
- } else if (task == SHUTDOWN_SEMAPHORE) {
- break;
- } else {
- processTask(task);
- lastProcessed = System.currentTimeMillis();
- }
- }
- } catch (TimeoutException e) {
- LOG.warn(task.getFetchKey().getFetchKey(), e);
- reportAndReset(task, ERROR_CODES.TIMEOUT, insertErrorLog, resetStatus, LOG);
- } catch (SecurityException e) {
- LOG.warn(task.getFetchKey().getFetchKey(), e);
- reportAndReset(task, ERROR_CODES.SECURITY_EXCEPTION, insertErrorLog, resetStatus,
- LOG);
- } catch (Exception e) {
- e.printStackTrace();
- LOG.warn(task.getFetchKey().getFetchKey(), e);
- reportAndReset(task, ERROR_CODES.OTHER_EXCEPTION, insertErrorLog, resetStatus, LOG);
- } catch (OutOfMemoryError e) {
- LOG.warn(task.getFetchKey().getFetchKey(), e);
- reportAndReset(task, ERROR_CODES.OOM, insertErrorLog, resetStatus, LOG);
- } catch (Error e) {
- LOG.warn(task.getFetchKey().getFetchKey(), e);
- reportAndReset(task, ERROR_CODES.OTHER_ERROR, insertErrorLog, resetStatus, LOG);
- } finally {
- executorService.shutdownNow();
- return 1;
- }
- }
-
- private void processTask(AsyncTask task) throws Exception {
-
- if (task == SHUTDOWN_SEMAPHORE) {
- LOG.debug("received shutdown notification");
- return;
- } else {
- executorCompletionService
- .submit(new TaskProcessor(task, tikaConfig, parser, workerId));
- Future<AsyncData> future =
- executorCompletionService.poll(parseTimeoutMs, TimeUnit.MILLISECONDS);
- if (future == null) {
- handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
- } else {
- AsyncData asyncData = future.get(1000, TimeUnit.MILLISECONDS);
- if (asyncData == null) {
- handleTimeout(task.getTaskId(), task.getFetchKey().getFetchKey());
- }
- boolean shouldEmit = checkForParseException(asyncData);
- if (shouldEmit) {
- try {
- emit(asyncData);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
- ERROR_CODES.EMIT_SERIALIZATION.ordinal());
- } catch (SQLException e) {
- e.printStackTrace();
- recordBadEmit(task.getTaskId(), task.getFetchKey().getFetchKey(),
- ERROR_CODES.EMIT_SQL_INSERT_EXCEPTION.ordinal());
- }
- }
- }
- }
- }
-
- private void recordBadEmit(long taskId, String key, int ordinal) {
- //stub
- }
-
- private void emit(AsyncData asyncData) throws SQLException, JsonProcessingException {
- insertEmitData.clearParameters();
- insertEmitData.setLong(1, asyncData.getTaskId());
- byte[] bytes = objectMapper.writeValueAsBytes(asyncData);
- byte[] compressed = factory.fastCompressor().compress(bytes);
- insertEmitData.setLong(2, bytes.length);
- insertEmitData.setBlob(3, new ByteArrayInputStream(compressed));
- insertEmitData.execute();
- updateStatusForEmit.clearParameters();
- updateStatusForEmit.setLong(1, asyncData.getTaskId());
- updateStatusForEmit.execute();
- }
-
- private void handleTimeout(long taskId, String key) throws TimeoutException {
- LOG.warn("timeout taskid:{} fetchKey:{}", taskId, key);
- throw new TimeoutException(key);
- }
-
- private boolean checkForParseException(AsyncData asyncData) {
- if (asyncData == null || asyncData.getMetadataList() == null ||
- asyncData.getMetadataList().size() == 0) {
- LOG.warn("empty or null emit data ({})", asyncData.getFetchKey().getFetchKey());
- return false;
- }
- boolean shouldEmit = true;
- Metadata container = asyncData.getMetadataList().get(0);
- String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
- if (stack != null) {
- LOG.warn("fetchKey ({}) container parse exception ({})",
- asyncData.getFetchKey().getFetchKey(), stack);
- if (asyncData.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
- shouldEmit = false;
- }
- }
-
- for (int i = 1; i < asyncData.getMetadataList().size(); i++) {
- Metadata m = asyncData.getMetadataList().get(i);
- String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
- if (embeddedStack != null) {
- LOG.warn("fetchKey ({}) embedded parse exception ({})",
- asyncData.getFetchKey().getFetchKey(), embeddedStack);
- }
- }
- return shouldEmit;
- }
- }
-
- private static class TaskProcessor implements Callable<AsyncData> {
-
- private final AsyncTask task;
- private final Parser parser;
- private final TikaConfig tikaConfig;
- private final int workerId;
-
- public TaskProcessor(AsyncTask task, TikaConfig tikaConfig, Parser parser, int workerId) {
- this.task = task;
- this.parser = parser;
- this.tikaConfig = tikaConfig;
- this.workerId = workerId;
- }
-
- public AsyncData call() throws Exception {
- Metadata userMetadata = task.getMetadata();
- Metadata metadata = new Metadata();
- String fetcherName = task.getFetchKey().getFetcherName();
- String fetchKey = task.getFetchKey().getFetchKey();
- List<Metadata> metadataList = null;
- try (InputStream stream = tikaConfig.getFetcherManager().getFetcher(fetcherName)
- .fetch(fetchKey, metadata)) {
- metadataList = parseMetadata(task.getFetchKey(), stream, metadata);
- } catch (SecurityException e) {
- throw e;
- }
- injectUserMetadata(userMetadata, metadataList);
- EmitKey emitKey = task.getEmitKey();
- if (StringUtils.isBlank(emitKey.getEmitKey())) {
- emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
- task.setEmitKey(emitKey);
- }
- return new AsyncData(task.getTaskId(), task.getFetchKey(), task.getEmitKey(),
- task.getOnParseException(), metadataList);
- }
-
- private List<Metadata> parseMetadata(FetchKey fetchKey, InputStream stream,
- Metadata metadata) {
- //make these configurable
- BasicContentHandlerFactory.HANDLER_TYPE type =
- BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
- int writeLimit = -1;
- int maxEmbeddedResources = 1000;
-
- RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
- new BasicContentHandlerFactory(type, writeLimit), maxEmbeddedResources,
- tikaConfig.getMetadataFilter());
- ParseContext parseContext = new ParseContext();
- try {
- parser.parse(stream, handler, metadata, parseContext);
- } catch (SAXException e) {
- LOG.warn("problem:" + fetchKey.getFetchKey(), e);
- } catch (EncryptedDocumentException e) {
- LOG.warn("encrypted:" + fetchKey.getFetchKey(), e);
- } catch (SecurityException e) {
- LOG.warn("security exception: " + fetchKey.getFetchKey());
- throw e;
- } catch (Exception e) {
- LOG.warn("exception: " + fetchKey.getFetchKey());
- } catch (OutOfMemoryError e) {
- LOG.error("oom: " + fetchKey.getFetchKey());
- throw e;
- }
- return handler.getMetadataList();
- }
-
- private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
- for (String n : userMetadata.names()) {
- //overwrite whatever was there
- metadataList.get(0).set(n, null);
- for (String val : userMetadata.getValues(n)) {
- metadataList.get(0).add(n, val);
- }
- }
- }
- }
-
- private static class ForkWatcher implements Callable<Integer> {
- private final InputStream in;
-
- public ForkWatcher(InputStream in) {
- this.in = in;
- }
-
- @Override
- public Integer call() throws Exception {
- //this should block forever
- //if the forking process dies,
- // this will either throw an IOException or read -1.
- int i = in.read();
- LOG.info("forking process notified forked to shutdown");
- return 1;
- }
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties b/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
deleted file mode 100644
index 43553a4..0000000
--- a/tika-pipes/tika-pipes-async/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#info,debug, error,fatal ...
-log4j.rootLogger=debug,stderr
-#console
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
deleted file mode 100644
index 5576f73..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-import org.apache.tika.config.Field;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.serialization.JsonMetadataSerializer;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.TikaEmitterException;
-
-public class MockEmitter implements Initializable, Emitter {
- private final ObjectMapper objectMapper = new ObjectMapper();
- private Connection connection;
- private String jdbc;
- private PreparedStatement insert;
-
- public MockEmitter() {
- SimpleModule module = new SimpleModule();
- module.addSerializer(Metadata.class, new JsonMetadataSerializer());
- objectMapper.registerModule(module);
- }
-
- @Field
- public void setJdbc(String jdbc) {
- this.jdbc = jdbc;
- }
-
- @Override
- public String getName() {
- return "mock";
- }
-
- @Override
- public void emit(String emitKey, List<Metadata> metadataList)
- throws IOException, TikaEmitterException {
- emit(Collections
- .singletonList(new EmitData(new EmitKey(getName(), emitKey), metadataList)));
- }
-
- @Override
- public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
- for (EmitData d : emitData) {
- String json = objectMapper.writeValueAsString(d);
- try {
- insert.clearParameters();
- insert.setString(1, d.getEmitKey().getEmitKey());
- insert.setString(2, json);
- insert.execute();
- } catch (SQLException e) {
- throw new TikaEmitterException("problem inserting", e);
- }
- }
- }
-
- @Override
- public void initialize(Map<String, Param> params) throws TikaConfigException {
- try {
- connection = DriverManager.getConnection(jdbc);
- String sql = "insert into emitted (emitkey, json) values (?, ?)";
- insert = connection.prepareStatement(sql);
- } catch (SQLException e) {
- throw new TikaConfigException("problem w connection", e);
- }
- }
-
- @Override
- public void checkInitialization(InitializableProblemHandler problemHandler)
- throws TikaConfigException {
-
- }
-}
diff --git a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java b/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
deleted file mode 100644
index ebbd8f0..0000000
--- a/tika-pipes/tika-pipes-async/src/test/java/org/apache/tika/pipes/async/SerializationTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import static org.junit.Assert.assertEquals;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.junit.Test;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.serialization.JsonMetadataDeserializer;
-
-public class SerializationTest {
-
- @Test
- public void testBasic() throws Exception {
- String json = "{\"taskId\":49,\"fetchKey\":{\"fetcherName\":\"mock\"," +
- "\"fetchKey\":\"key-48\"},\"emitKey\":{\"emitterName\":\"mock\"," +
- "\"emitKey\":\"emit-48\"},\"onParseException\":\"EMIT\",\"metadataList\":" +
- "[{\"X-TIKA:Parsed-By\":" +
- "\"org.apache.tika.parser.EmptyParser\",\"X-TIKA:parse_time_millis\":" +
- "\"0\",\"X-TIKA:embedded_depth\":\"0\"}]}";
-
- ObjectMapper mapper = new ObjectMapper();
- SimpleModule module = new SimpleModule();
- module.addDeserializer(Metadata.class, new JsonMetadataDeserializer());
- mapper.registerModule(module);
- AsyncData asyncData = mapper.readValue(json, AsyncData.class);
- assertEquals(49, asyncData.getTaskId());
- assertEquals("mock", asyncData.getFetchKey().getFetcherName());
- assertEquals(1, asyncData.getMetadataList().size());
- }
-
-}
diff --git a/tika-server/tika-server-core/pom.xml b/tika-server/tika-server-core/pom.xml
index 3ed9dd6..5b60a59 100644
--- a/tika-server/tika-server-core/pom.xml
+++ b/tika-server/tika-server-core/pom.xml
@@ -36,7 +36,6 @@
</pluginRepositories>
<dependencies>
-
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tika-translate</artifactId>
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
index e5a4921..4081102 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerCli.java
@@ -141,9 +141,10 @@ public class TikaServerCli {
return ret.toArray(new String[0]);
}
- public static void noFork(TikaServerConfig tikaServerConfig) {
+ public static void noFork(TikaServerConfig tikaServerConfig) throws Exception {
List<String> args = tikaServerConfig
.getForkedProcessArgs(tikaServerConfig.getPort(), tikaServerConfig.getIdBase());
+ args.add("-noFork");
TikaServerProcess.main(args.toArray(new String[0]));
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
index e12dd37..1cba4cc 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
@@ -130,7 +130,7 @@ public class TikaServerConfig {
TikaServerConfig config = null;
Set<String> settings = new HashSet<>();
if (commandLine.hasOption("c")) {
- config = load(Paths.get(commandLine.getOptionValue("c")), settings);
+ config = load(Paths.get(commandLine.getOptionValue("c")), commandLine, settings);
config.setConfigPath(commandLine.getOptionValue("c"));
} else {
config = new TikaServerConfig();
@@ -175,13 +175,14 @@ public class TikaServerConfig {
return config;
}
- static TikaServerConfig load(Path p, Set<String> settings) throws IOException, TikaException {
+ static TikaServerConfig load(Path p, CommandLine commandLine, Set<String> settings) throws IOException,
+ TikaException {
try (InputStream is = Files.newInputStream(p)) {
- return TikaServerConfig.load(is, settings);
+ return TikaServerConfig.load(is, commandLine, settings);
}
}
- static TikaServerConfig load(InputStream is, Set<String> settings)
+ static TikaServerConfig load(InputStream is, CommandLine commandLine, Set<String> settings)
throws IOException, TikaException {
Node properties = null;
try {
@@ -201,6 +202,10 @@ public class TikaServerConfig {
loadServerConfig(child, config, settings);
}
}
+ //override a few things in config file
+ if (commandLine.hasOption("noFork")) {
+ config.setNoFork(true);
+ }
return config;
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 390d731..6f30eac 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -27,12 +27,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -57,10 +51,6 @@ import org.apache.tika.config.TikaConfig;
import org.apache.tika.parser.DigestingParser;
import org.apache.tika.parser.digestutils.BouncyCastleDigester;
import org.apache.tika.parser.digestutils.CommonsDigester;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.server.core.resource.AsyncEmitter;
-import org.apache.tika.server.core.resource.AsyncParser;
import org.apache.tika.server.core.resource.AsyncResource;
import org.apache.tika.server.core.resource.DetectorResource;
import org.apache.tika.server.core.resource.EmitterResource;
@@ -102,6 +92,7 @@ public class TikaServerProcess {
options.addOption("i", "id", true,
"id to use for server in server status endpoint");
options.addOption("?", "help", false, "this help message");
+ options.addOption("noFork", false, "if launched in no fork mode");
options.addOption("forkedStatusFile", true,
"Not allowed in -noFork: temporary file used to communicate " +
"with forking process -- do not use this! " +
@@ -114,44 +105,40 @@ public class TikaServerProcess {
return options;
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
LOG.info("Starting {} server", new Tika());
+ AsyncResource asyncResource = null;
try {
Options options = getOptions();
CommandLineParser cliParser = new DefaultParser();
CommandLine line = cliParser.parse(options, args);
TikaServerConfig tikaServerConfig = TikaServerConfig.load(line);
LOG.debug("forked config: {}", tikaServerConfig);
- mainLoop(tikaServerConfig);
+ if (tikaServerConfig.isEnableUnsecureFeatures()) {
+ final AsyncResource localAsyncResource =
+ new AsyncResource(tikaServerConfig.getConfigPath());
+ Runtime.getRuntime().addShutdownHook(new Thread() { public void run() {
+ try {
+ localAsyncResource.shutdownNow();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } } );
+ asyncResource = localAsyncResource;
+ }
+
+
+ ServerDetails serverDetails = initServer(tikaServerConfig, asyncResource);
+ startServer(serverDetails);
+
} catch (Exception e) {
LOG.error("Can't start: ", e);
System.exit(-1);
}
}
- private static void mainLoop(TikaServerConfig tikaServerConfig) throws Exception {
- AsyncResource asyncResource = null;
- ArrayBlockingQueue<FetchEmitTuple> asyncFetchEmitQueue = null;
- ArrayBlockingQueue<EmitData> asyncEmitData = null;
- int numAsyncParserThreads = 10;
- if (tikaServerConfig.isEnableUnsecureFeatures()) {
- asyncResource = new AsyncResource();
- asyncFetchEmitQueue = asyncResource.getFetchEmitQueue(10000);
- asyncEmitData = asyncResource.getEmitDataQueue(1000);
- }
-
- ServerDetails serverDetails = initServer(tikaServerConfig, asyncResource);
- ExecutorService executorService = Executors.newFixedThreadPool(numAsyncParserThreads + 1);
- ExecutorCompletionService<Integer> executorCompletionService =
- new ExecutorCompletionService<>(executorService);
+ private static void startServer(ServerDetails serverDetails) throws Exception {
- if (asyncFetchEmitQueue != null) {
- executorCompletionService.submit(new AsyncEmitter(asyncEmitData));
- for (int i = 0; i < numAsyncParserThreads; i++) {
- executorCompletionService
- .submit(new AsyncParser(asyncFetchEmitQueue, asyncEmitData));
- }
- }
try {
//start the server
Server server = serverDetails.sf.create();
@@ -159,15 +146,7 @@ public class TikaServerProcess {
LOG.warn("exception starting server", e);
System.exit(DO_NOT_RESTART_EXIT_VALUE);
}
-
LOG.info("Started Apache Tika server {} at {}", serverDetails.serverId, serverDetails.url);
-
- while (true) {
- Future<Integer> future = executorCompletionService.poll(1, TimeUnit.MINUTES);
- if (future != null) {
- LOG.warn("Daemon should not stop: " + future.get());
- }
- }
}
//This returns the server, configured and ready to be started.
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
deleted file mode 100644
index 773297d..0000000
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncParser.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.server.core.resource;
-
-import java.io.InputStream;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import javax.ws.rs.core.MultivaluedHashMap;
-
-import org.apache.cxf.jaxrs.impl.UriInfoImpl;
-import org.apache.cxf.message.MessageImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.utils.StringUtils;
-
-/**
- * Worker thread that takes {@link FetchEmitTuple} off the queue, parses
- * the file and puts the {@link EmitData} on the emitDataQueue for the {@link AsyncEmitter}.
- */
-public class AsyncParser implements Callable<Integer> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncParser.class);
-
- private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTupleQueue;
- private final ArrayBlockingQueue<EmitData> emitDataQueue;
-
- public AsyncParser(ArrayBlockingQueue<FetchEmitTuple> queue,
- ArrayBlockingQueue<EmitData> emitData) {
- this.fetchEmitTupleQueue = queue;
- this.emitDataQueue = emitData;
- }
-
- @Override
- public Integer call() throws Exception {
- while (true) {
- FetchEmitTuple request = fetchEmitTupleQueue.poll(1, TimeUnit.MINUTES);
- if (request != null) {
- EmitData emitData = processTuple(request);
- boolean shouldEmit = checkForParseException(request, emitData);
- if (shouldEmit) {
- boolean offered = emitDataQueue.offer(emitData, 10, TimeUnit.MINUTES);
- if (!offered) {
- //TODO: deal with this
- LOG.warn("Failed to add ({}) " + "to emit queue after 10 minutes.",
- request.getFetchKey().getFetchKey());
- }
- }
- } else {
- LOG.trace("Nothing on the async queue");
- }
- }
- }
-
- private boolean checkForParseException(FetchEmitTuple request, EmitData emitData) {
- if (emitData == null || emitData.getMetadataList() == null ||
- emitData.getMetadataList().size() == 0) {
- LOG.warn("empty or null emit data ({})", request.getFetchKey().getFetchKey());
- return false;
- }
- boolean shouldEmit = true;
- Metadata container = emitData.getMetadataList().get(0);
- String stack = container.get(TikaCoreProperties.CONTAINER_EXCEPTION);
- if (stack != null) {
- LOG.warn("fetchKey ({}) container parse exception ({})",
- request.getFetchKey().getFetchKey(),
- stack);
- if (request.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP) {
- shouldEmit = false;
- }
- }
-
- for (int i = 1; i < emitData.getMetadataList().size(); i++) {
- Metadata m = emitData.getMetadataList().get(i);
- String embeddedStack = m.get(TikaCoreProperties.EMBEDDED_EXCEPTION);
- if (embeddedStack != null) {
- LOG.warn("fetchKey ({}) embedded parse exception ({})",
- request.getFetchKey().getFetchKey(), embeddedStack);
- }
- }
- return shouldEmit;
- }
-
- private EmitData processTuple(FetchEmitTuple t) {
- Metadata userMetadata = t.getMetadata();
- Metadata metadata = new Metadata();
- String fetcherName = t.getFetchKey().getFetcherName();
- String fetchKey = t.getFetchKey().getFetchKey();
- List<Metadata> metadataList = null;
- try (InputStream stream = TikaResource.getConfig().getFetcherManager()
- .getFetcher(fetcherName).fetch(fetchKey, metadata)) {
- metadataList = RecursiveMetadataResource
- .parseMetadata(stream, metadata, new MultivaluedHashMap<>(),
- new UriInfoImpl(new MessageImpl()), t.getHandlerConfig());
- } catch (SecurityException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn(t.toString(), e);
- }
-
- injectUserMetadata(userMetadata, metadataList);
- EmitKey emitKey = t.getEmitKey();
- if (StringUtils.isBlank(emitKey.getEmitKey())) {
- emitKey = new EmitKey(emitKey.getEmitterName(), fetchKey);
- }
- return new EmitData(emitKey, metadataList);
- }
-
- private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) {
- for (String n : userMetadata.names()) {
- //overwrite whatever was there
- metadataList.get(0).set(n, null);
- for (String val : userMetadata.getValues(n)) {
- metadataList.get(0).add(n, val);
- }
- }
- }
-}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index dea21c4..023291f 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -23,13 +23,9 @@ import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -40,11 +36,14 @@ import javax.ws.rs.core.UriInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.async.AsyncProcessor;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.EmitterManager;
@@ -58,8 +57,14 @@ public class AsyncResource {
private static final int DEFAULT_FETCH_EMIT_QUEUE_SIZE = 10000;
long maxQueuePauseMs = 60000;
+ private final AsyncProcessor asyncProcessor;
private ArrayBlockingQueue<FetchEmitTuple> queue;
+ public AsyncResource(java.nio.file.Path tikaConfigPath)
+ throws TikaException, IOException, SAXException {
+ this.asyncProcessor = new AsyncProcessor(tikaConfigPath);
+ }
+
public ArrayBlockingQueue<FetchEmitTuple> getFetchEmitQueue(int queueSize) {
this.queue = new ArrayBlockingQueue<>(queueSize);
return queue;
@@ -106,26 +111,12 @@ public class AsyncResource {
}
}
Instant start = Instant.now();
- long elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
- List<FetchEmitTuple> notAdded = new ArrayList<>();
- int addedCount = 0;
- for (FetchEmitTuple t : request.getTuples()) {
- boolean offered = false;
- while (!offered && elapsed < maxQueuePauseMs) {
- offered = queue.offer(t, 10, TimeUnit.MILLISECONDS);
- elapsed = ChronoUnit.MILLIS.between(start, Instant.now());
- }
- if (!offered) {
- notAdded.add(t);
- } else {
- addedCount++;
- }
+ boolean offered = asyncProcessor.offer(request.getTuples(), maxQueuePauseMs);
+ if (offered) {
+ return ok(request.getTuples().size());
+ } else {
+ return throttle(request.getTuples().size());
}
-
- if (notAdded.size() > 0) {
- return throttle(notAdded, addedCount);
- }
- return ok(request.getTuples().size());
}
private Map<String, Object> ok(int size) {
@@ -135,16 +126,10 @@ public class AsyncResource {
return map;
}
- private Map<String, Object> throttle(List<FetchEmitTuple> notAdded, int added) {
- List<String> fetchKeys = new ArrayList<>();
- for (FetchEmitTuple t : notAdded) {
- fetchKeys.add(t.getFetchKey().getFetchKey());
- }
+ private Map<String, Object> throttle(int requestSize) {
Map<String, Object> map = new HashMap<>();
map.put("status", "throttled");
- map.put("added", added);
- map.put("skipped", notAdded.size());
- map.put("skippedFetchKeys", fetchKeys);
+ map.put("msg", "not able to receive request of size " + requestSize + " at this time");
return map;
}
@@ -162,4 +147,8 @@ public class AsyncResource {
}
}
+ public void shutdownNow() throws Exception {
+ asyncProcessor.close();
+ }
+
}
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
index 44547c5..540e25f 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerAsyncIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.tika.server.core;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
@@ -53,7 +54,7 @@ import org.apache.tika.pipes.fetcher.FetchKey;
public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
private static final Logger LOG = LoggerFactory.getLogger(TikaServerAsyncIntegrationTest.class);
- private static final int NUM_FILES = 1000;
+ private static final int NUM_FILES = 450;
private static final String EMITTER_NAME = "fse";
private static final String FETCHER_NAME = "fsf";
private static FetchEmitTuple.ON_PARSE_EXCEPTION ON_PARSE_EXCEPTION =
@@ -63,7 +64,9 @@ public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
private static String TIKA_CONFIG_XML;
private static Path TIKA_CONFIG;
private static List<String> FILE_LIST = new ArrayList<>();
- private static String[] FILES = new String[]{"hello_world.xml", "null_pointer.xml"
+ private static String[] FILES = new String[]{
+ "hello_world.xml",
+ "null_pointer.xml"
// "heavy_hang_30000.xml", "real_oom.xml", "system_exit.xml"
};
@@ -131,21 +134,31 @@ public class TikaServerAsyncIntegrationTest extends IntegrationTestBase {
Thread serverThread = new Thread() {
@Override
public void run() {
- TikaServerCli.main(new String[]{"-p", INTEGRATION_TEST_PORT, "-config",
+ TikaServerCli.main(new String[]{
+ //for debugging/development, use no fork; otherwise go with the default
+ //"-noFork",
+ "-p", INTEGRATION_TEST_PORT, "-config",
TIKA_CONFIG.toAbsolutePath().toString()});
}
};
serverThread.start();
try {
+ long start = System.currentTimeMillis();
+
JsonNode response = sendAsync(FILE_LIST);
+ String status = response.get("status").asText();
+ if (! "ok".equals(status)) {
+ fail("bad status: '" + status + "' -> " + response.toPrettyString());
+ }
int expected = (ON_PARSE_EXCEPTION == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) ?
FILE_LIST.size() : FILE_LIST.size() / 2;
int targets = 0;
while (targets < FILE_LIST.size()) {
targets = countTargets();
- Thread.sleep(1000);
+ Thread.sleep(100);
}
+ System.out.println("elapsed : " + (System.currentTimeMillis() - start));
} finally {
serverThread.interrupt();
}
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
index 2cb2a2c..a67788d 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
@@ -22,6 +22,10 @@ import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
import org.junit.Test;
import org.apache.tika.config.TikaConfigTest;
@@ -31,8 +35,11 @@ public class TikaServerConfigTest {
@Test
public void testBasic() throws Exception {
Set<String> settings = new HashSet<>();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine emptyCommandLine = parser.parse(new Options(), new String[]{});
TikaServerConfig config = TikaServerConfig
.load(TikaConfigTest.class.getResourceAsStream("/configs/tika-config-server.xml"),
+ emptyCommandLine,
settings);
assertEquals(-1, config.getMaxRestarts());
assertEquals(54321, config.getTaskTimeoutMillis());