You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/28 20:03:37 UTC

[tika] 02/02: TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve documentation and remove the unused AsyncClientConfig.java

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit ead2e752cee9cefaa4ec44aed70b2c77f5732bbe
Author: tallison <ta...@apache.org>
AuthorDate: Wed Jul 28 16:02:42 2021 -0400

    TIKA-3505 -- move maxemitbatchbytes to PipesConfigBase, improve documentation and remove the unused AsyncClientConfig.java
---
 .../java/org/apache/tika/pipes/PipesClient.java    | 13 +---
 .../org/apache/tika/pipes/PipesConfigBase.java     | 75 +++++++++++++++++++---
 .../java/org/apache/tika/pipes/PipesServer.java    | 24 +++----
 .../apache/tika/pipes/async/AsyncClientConfig.java | 34 ----------
 .../org/apache/tika/pipes/async/AsyncConfig.java   | 17 -----
 5 files changed, 81 insertions(+), 82 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index cd070c7..46248f0 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -44,7 +44,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.tika.pipes.async.AsyncConfig;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.utils.ProcessUtils;
 
@@ -101,8 +100,8 @@ public class PipesClient implements Closeable {
         if (! ping()) {
             restart();
         }
-        if (pipesConfig.getMaxFilesProcessed() > 0 &&
-                filesProcessed >= pipesConfig.getMaxFilesProcessed()) {
+        if (pipesConfig.getMaxFilesProcessedPerProcess() > 0 &&
+                filesProcessed >= pipesConfig.getMaxFilesProcessedPerProcess()) {
             LOG.info("restarting server after hitting max files: " + filesProcessed);
             restart();
         }
@@ -356,13 +355,7 @@ public class PipesClient implements Closeable {
         commandLine.add(
                 ProcessUtils.escapeCommandLine(pipesConfig.getTikaConfig().toAbsolutePath().toString()));
 
-        //turn off emit batching
-        String maxForEmitBatchBytes = "0";
-        if (pipesConfig instanceof AsyncConfig) {
-            maxForEmitBatchBytes =
-                    Long.toString(((AsyncConfig)pipesConfig).getMaxForEmitBatchBytes());
-        }
-        commandLine.add(maxForEmitBatchBytes);
+        commandLine.add(Long.toString(pipesConfig.getMaxForEmitBatchBytes()));
         commandLine.add(Long.toString(pipesConfig.getTimeoutMillis()));
         commandLine.add(Long.toString(pipesConfig.getShutdownClientAfterMillis()));
         LOG.debug("commandline: " + commandLine);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index e0ad8d9..f48e7fd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -25,12 +25,35 @@ import org.apache.tika.config.ConfigBase;
 
 public class PipesConfigBase extends ConfigBase {
 
-    private long timeoutMillis = 30000;
-    private long startupTimeoutMillis = 240000;
-    private long shutdownClientAfterMillis = 300000;
-    private int numClients = 10;
+    /**
+     * default size to send back to the PipesClient for batch
+     * emitting.  If an extract is larger than this, it will be emitted
+     * directly from the forked PipesServer.
+     */
+    public static final long DEFAULT_MAX_FOR_EMIT_BATCH = 100000;
+
+    public static final long DEFAULT_TIMEOUT_MILLIS = 60000;
+
+    public static final long DEFAULT_STARTUP_TIMEOUT_MILLIS = 240000;
+
+    public static final long DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS = 300000;
+
+    public static final int DEFAULT_NUM_CLIENTS = 4;
+
+    public static final int DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS = 10000;
+
+    //if an extract is larger than this, the forked PipesServer should
+    //emit the extract directly and not send the contents back to the PipesClient
+    private long maxForEmitBatchBytes = DEFAULT_MAX_FOR_EMIT_BATCH;
+    private long timeoutMillis = DEFAULT_TIMEOUT_MILLIS;
+    private long startupTimeoutMillis = DEFAULT_STARTUP_TIMEOUT_MILLIS;
+
+    private long shutdownClientAfterMillis = DEFAULT_SHUTDOWN_CLIENT_AFTER_MILLS;
+    private int numClients = DEFAULT_NUM_CLIENTS;
+
+    private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;
+
     private List<String> forkedJvmArgs = new ArrayList<>();
-    private int maxFilesProcessed = 10000;
     private Path tikaConfig;
     private String javaPath = "java";
 
@@ -38,6 +61,10 @@ public class PipesConfigBase extends ConfigBase {
         return timeoutMillis;
     }
 
+    /**
+     * How long to wait in milliseconds before timing out the forked process.
+     * @param timeoutMillis
+     */
     public void setTimeoutMillis(long timeoutMillis) {
         this.timeoutMillis = timeoutMillis;
     }
@@ -46,6 +73,12 @@ public class PipesConfigBase extends ConfigBase {
         return shutdownClientAfterMillis;
     }
 
+    /**
+     * If the client has been inactive after this many milliseconds,
+     * shut it down.
+     *
+     * @param shutdownClientAfterMillis
+     */
     public void setShutdownClientAfterMillis(long shutdownClientAfterMillis) {
         this.shutdownClientAfterMillis = shutdownClientAfterMillis;
     }
@@ -66,12 +99,17 @@ public class PipesConfigBase extends ConfigBase {
         this.forkedJvmArgs = jvmArgs;
     }
 
-    public int getMaxFilesProcessed() {
-        return maxFilesProcessed;
+    /**
+     * Restart the forked PipesServer after it has processed this many files to avoid
+     * slow-building memory leaks.
+     * @return
+     */
+    public int getMaxFilesProcessedPerProcess() {
+        return maxFilesProcessedPerProcess;
     }
 
-    public void setMaxFilesProcessed(int maxFilesProcessed) {
-        this.maxFilesProcessed = maxFilesProcessed;
+    public void setMaxFilesProcessedPerProcess(int maxFilesProcessedPerProcess) {
+        this.maxFilesProcessedPerProcess = maxFilesProcessedPerProcess;
     }
 
     public Path getTikaConfig() {
@@ -97,4 +135,23 @@ public class PipesConfigBase extends ConfigBase {
     public long getStartupTimeoutMillis() {
         return startupTimeoutMillis;
     }
+
+    /**
+     *  What is the maximum bytes size per extract that
+     *  will be allowed to be shipped back to the emit queue in the forking process.
+     *  If an extract is too big, skip the emit queue and forward it directly from the
+     *  forked PipesServer.
+     *  If set to <code>0</code>, this will never send an extract back for batch emitting,
+     *  but will always emit the extract directly from the forked PipeServer.
+     *  If set to <code>-1</code>, this will always send the extract back for batch emitting.
+     *
+     * @return the threshold extract size at which to emit directly from the forked PipeServer
+     */
+    public long getMaxForEmitBatchBytes() {
+        return maxForEmitBatchBytes;
+    }
+
+    public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
+        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 68b304f..ff0d41d 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -117,7 +117,10 @@ public class PipesServer implements Runnable {
     private final Path tikaConfigPath;
     private final DataInputStream input;
     private final DataOutputStream output;
-    private final long maxExtractSizeToReturn;
+    //if an extract is larger than this value, emit it directly;
+    //if it is smaller than this value, write it back to the
+    //PipesClient so that it can cache the extracts and then batch emit.
+    private final long maxForEmitBatchBytes;
     private final long serverParseTimeoutMillis;
     private final long serverWaitTimeoutMillis;
     private Parser autoDetectParser;
@@ -128,17 +131,15 @@ public class PipesServer implements Runnable {
     private volatile boolean parsing;
     private volatile long since;
 
-    //logging is fussy...the logging frameworks grab stderr and stdout
-    //before we can redirect.  slf4j complains on stderr, log4j2 unconfigured writes to stdout
-    //We can add logging later but it has to be done carefully...
+
     public PipesServer(Path tikaConfigPath, InputStream in, PrintStream out,
-                       long maxExtractSizeToReturn,
+                       long maxForEmitBatchBytes,
                        long serverParseTimeoutMillis, long serverWaitTimeoutMillis)
             throws IOException, TikaException, SAXException {
         this.tikaConfigPath = tikaConfigPath;
         this.input = new DataInputStream(in);
         this.output = new DataOutputStream(out);
-        this.maxExtractSizeToReturn = maxExtractSizeToReturn;
+        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
         this.serverParseTimeoutMillis = serverParseTimeoutMillis;
         this.serverWaitTimeoutMillis = serverWaitTimeoutMillis;
         this.parsing = false;
@@ -334,14 +335,14 @@ public class PipesServer implements Runnable {
                 t.setEmitKey(emitKey);
             }
             EmitData emitData = new EmitData(t.getEmitKey(), metadataList);
-            if (emitData.getEstimatedSizeBytes() >= maxExtractSizeToReturn) {
+            if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
                 emit(t.getId(), emitData, stack);
             } else {
-                write(emitData, stack);
+                //ignore the stack, it is stored in the emit data
+                write(emitData);
             }
         } else {
-            write(STATUS.PARSE_EXCEPTION_NO_EMIT,
-                    stack.getBytes(StandardCharsets.UTF_8));
+            write(STATUS.PARSE_EXCEPTION_NO_EMIT, stack);
         }
 
     }
@@ -509,8 +510,7 @@ public class PipesServer implements Runnable {
     }
 
 
-    private void write(EmitData emitData, String stack) {
-        //TODO -- what do we do with the stack?
+    private void write(EmitData emitData) {
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
deleted file mode 100644
index 1fe5159..0000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncClientConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.async;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-class AsyncClientConfig {
-
-    private int fetchQueueSize = 20000;
-    private long waitTimeoutMs;
-    private long maxFilesProcessed;
-
-    public static AsyncClientConfig load(Path p) throws IOException {
-
-        return new AsyncClientConfig();
-    }
-
-
-}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index fa4a1b0..c2d5ed8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -29,7 +29,6 @@ public class AsyncConfig extends PipesConfigBase {
     private long emitWithinMillis = 10000;
     private long emitMaxEstimatedBytes = 100000;
 
-    private long maxForEmitBatchBytes = 0;
     private int queueSize = 10000;
     private int numEmitters = 1;
 
@@ -73,22 +72,6 @@ public class AsyncConfig extends PipesConfigBase {
     }
 
 
-    /**
-     *  What is the maximum bytes size per extract that
-     *  will be allowed in the emit queue.  If an extract is too
-     *  big, skip the emit queue and forward it directly from the processor.  If
-     *  set to <code>0</code>, this will never send an extract back for batch emitting,
-     *  but will emit the extract directly from the processor.
-     * @return
-     */
-    public long getMaxForEmitBatchBytes() {
-        return maxForEmitBatchBytes;
-    }
-
-    public void setMaxForEmitBatchBytes(long maxForEmitBatchBytes) {
-        this.maxForEmitBatchBytes = maxForEmitBatchBytes;
-    }
-
     public void setNumEmitters(int numEmitters) {
         this.numEmitters = numEmitters;
     }