You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/28 20:03:37 UTC
[tika] 02/02: TIKA-3505 -- move maxemitbatchbytes to
PipesConfigBase,
improve documentation and remove the unused AsyncClientConfig.java
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
commit ead2e752cee9cefaa4ec44aed70b2c77f5732bbe
Author: tallison <ta...@apache.org>
AuthorDate: Wed Jul 28 16:02:42 2021 -0400
TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve documentation and remove the unused AsyncClientConfig.java
---
.../java/org/apache/tika/pipes/PipesClient.java | 13 +---
.../org/apache/tika/pipes/PipesConfigBase.java | 75 +++++++++++++++++++---
.../java/org/apache/tika/pipes/PipesServer.java | 24 +++----
.../apache/tika/pipes/async/AsyncClientConfig.java | 34 ----------
.../org/apache/tika/pipes/async/AsyncConfig.java | 17 -----
5 files changed, 81 insertions(+), 82 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index cd070c7..46248f0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.tika.pipes.async.AsyncConfig;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.utils.ProcessUtils;
@@ -101,8 +100,8 @@ public class PipesClient implements Closeable {
if (! ping()) {
restart();
}
- if (pipesConfig.getMaxFilesProcessed() > 0 &&
- filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
+ if (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
+ filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess()) {
LOG.info("restarting server after hitting max files: " + filesProcessed);
restart();
}
@@ -356,13 +355,7 @@ public class PipesClient implements Closeable {
commandLine.add(
ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
- //turn off emit batching
- String maxForEmitBatchBytes = "0";
- if (pipesConfig instanceof AsyncConfig) {
- maxForEmitBatchBytes =
- Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
- }
- commandLine.add(maxForEmitBatchBytes);
+ commandLine.add(Long.toString(pipesConfig.getMaxForEmitBatchBytes()));
commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
LOG.debug("commandline: " + commandLine);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index e0ad8d9..f48e7fd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -25,12 +25,35 @@ import org.apache.tika.config.ConfigBase;
public class PipesConfigBase extends ConfigBase {
- private long timeoutMillis = 30000;
- private long startupTimeoutMillis = 240000;
- private long shutdownClientAfterMillis = 300000;
- private int numClients = 10;
+ /**
+ * default size to send back to the PipesClient for batch
+ * emitting. If an extract is larger than this, it will be emitted
+ * directly from the forked PipesServer.
+ */
+ public static final long DEFAULT_MAX_FOR_EMIT_BATCH = 100000;
+
+ public static final long DEFAULT_TIMEOUT_MILLIS = 60000;
+
+ public static final long DEFAULT_STARTUP_TIMEOUT_MILLIS = 240000;
+
+ public static final long DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS = 300000;
+
+ public static final int DEFAULT_NUM_CLIENTS = 4;
+
+ public static final int DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS = 10000;
+
+ //if an extract is larger than this, the forked PipesServer should
+ //emit the extract directly and not send the contents back to the PipesClient
+ private long maxForEmitBatchBytes = DEFAULT_MAX_FOR_EMIT_BATCH;
+ private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
+ private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
+
+ private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
+ private int numClients = DEFAULT_NUM_CLIENTS;
+
+ private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;
+
private List<String> forkedJvmArgs = new ArrayList<>();
- private int maxFilesProcessed = 10000;
private Path tikaConfig;
private String javaPath = "java";
@@ -38,6 +61,10 @@ public class PipesConfigBase extends ConfigBase {
return timeoutMillis;
}
+ /**
+ * How long to wait in milliseconds before timing out the forked process.
+ * @param timeoutMillis
+ */
public void setTimeoutMillis(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
@@ -46,6 +73,12 @@ public class PipesConfigBase extends ConfigBase {
return shutdownClientAfterMillis;
}
+ /**
+ * If the client has been inactive after this many milliseconds,
+ * shut it down.
+ *
+ * @param shutdownClientAfterMillis
+ */
public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
this.shutdownClientAfterMillis = shutdownClientAfterMillis;
}
@@ -66,12 +99,17 @@ public class PipesConfigBase extends ConfigBase {
this.forkedJvmArgs = jvmArgs;
}
- public int getMaxFilesProcessed() {
- return maxFilesProcessed;
+ /**
+ * Restart the forked PipesServer after it has processed this many files to avoid
+ * slow-building memory leaks.
+ * @return
+ */
+ public int getMaxFilesProcessedPerProcess() {
+ return maxFilesProcessedPerProcess;
}
- public void setMaxFilesProcessed(int maxFilesProcessed) {
- this.maxFilesProcessed = maxFilesProcessed;
+ public void setMaxFilesProcessedPerProcess(int maxFilesProcessedPerProcess) {
+ this.maxFilesProcessedPerProcess = maxFilesProcessedPerProcess;
}
public Path getTikaConfig() {
@@ -97,4 +135,23 @@ public class PipesConfigBase extends ConfigBase {
public long getStartupTimeoutMillis() {
return startupTimeoutMillis;
}
+
+ /**
+ * What is the maximum bytes size per extract that
+ * will be allowed to be shipped back to the emit queue in the forking process.
+ * If an extract is too big, skip the emit queue and forward it directly from the
+ * forked PipesServer.
+ * If set to <code>0</code>, this will never send an extract back for batch emitting,
+ * but will always emit the extract directly from the forked PipeServer.
+ * If set to <code>-1</code>, this will always send the extract back for batch emitting.
+ *
+ * @return the threshold extract size at which to emit directly from the forked PipeServer
+ */
+ public long getMaxForEmitBatchBytes() {
+ return maxForEmitBatchBytes;
+ }
+
+ public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
+ this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 68b304f..ff0d41d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -117,7 +117,10 @@ public class PipesServer implements Runnable {
private final Path tikaConfigPath;
private final DataInputStream input;
private final DataOutputStream output;
- private final long maxExtractSizeToReturn;
+ //if an extract is larger than this value, emit it directly;
+ //if it is smaller than this value, write it back to the
+ //PipesClient so that it can cache the extracts and then batch emit.
+ private final long maxForEmitBatchBytes;
private final long serverParseTimeoutMillis;
private final long serverWaitTimeoutMillis;
private Parser autoDetectParser;
@@ -128,17 +131,15 @@ public class PipesServer implements Runnable {
private volatile boolean parsing;
private volatile long since;
- //logging is fussy...the logging frameworks grab stderr and stdout
- //before we can redirect. slf4j complains on stderr, log4j2 unconfigured writes to stdout
- //We can add logging later but it has to be done carefully...
+
public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
- long maxExtractSizeToReturn,
+ long maxForEmitBatchBytes,
long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
throws IOException, TikaException, SAXException {
this.tikaConfigPath = tikaConfigPath;
this.input = new DataInputStream(in);
this.output = new DataOutputStream(out);
- this.maxExtractSizeToReturn = maxExtractSizeToReturn;
+ this.maxForEmitBatchBytes = maxForEmitBatchBytes;
this.serverParseTimeoutMillis = serverParseTimeoutMillis;
this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
this.parsing = false;
@@ -334,14 +335,14 @@ public class PipesServer implements Runnable {
t.setEmitKey(emitKey);
}
EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
- if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
+ if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
emit(t.getId(), emitData, stack);
} else {
- write(emitData, stack);
+ //ignore the stack, it is stored in the emit data
+ write(emitData);
}
} else {
- write(STATUS.PARSE_EXCEPTION_NO_EMIT,
- stack.getBytes(StandardCharsets.UTF_8));
+ write(STATUS.PARSE_EXCEPTION_NO_EMIT, stack);
}
}
@@ -509,8 +510,7 @@ public class PipesServer implements Runnable {
}
- private void write(EmitData emitData, String stack) {
- //TODO -- what do we do with the stack?
+ private void write(EmitData emitData) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
deleted file mode 100644
index 1fe5159..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-class AsyncClientConfig {
-
- private int fetchQueueSize = 20000;
- private long waitTimeoutMs;
- private long maxFilesProcessed;
-
- public static AsyncClientConfig load(Path p) throws IOException {
-
- return new AsyncClientConfig();
- }
-
-
-}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index fa4a1b0..c2d5ed8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -29,7 +29,6 @@ public class AsyncConfig extends PipesConfigBase {
private long emitWithinMillis = 10000;
private long emitMaxEstimatedBytes = 100000;
- private long maxForEmitBatchBytes = 0;
private int queueSize = 10000;
private int numEmitters = 1;
@@ -73,22 +72,6 @@ public class AsyncConfig extends PipesConfigBase {
}
- /**
- * What is the maximum bytes size per extract that
- * will be allowed in the emit queue. If an extract is too
- * big, skip the emit queue and forward it directly from the processor. If
- * set to <code>0</code>, this will never send an extract back for batch emitting,
- * but will emit the extract directly from the processor.
- * @return
- */
- public long getMaxForEmitBatchBytes() {
- return maxForEmitBatchBytes;
- }
-
- public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
- this.maxForEmitBatchBytes = maxForEmitBatchBytes;
- }
-
public void setNumEmitters(int numEmitters) {
this.numEmitters = numEmitters;
}