You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/03/29 11:33:03 UTC

(tika) branch TIKA-4181-grpc updated: make it as a stale connection expiring store

This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/TIKA-4181-grpc by this push:
     new 114260749 make it as a stale connection expiring store
114260749 is described below

commit 1142607490f8b40d2741ee85ef0d3d60316d7e5d
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Fri Mar 29 06:32:57 2024 -0500

    make it as a stale connection expiring store
---
 .../org/apache/tika/pipes/PipesConfigBase.java     | 21 +++++-
 .../tika/pipes/grpc/ExpiringFetcherStore.java      | 81 ++++++++++++++++++++++
 .../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 30 ++++----
 .../tika/pipes/grpc/ExpiringFetcherStoreTest.java  | 41 +++++++++++
 4 files changed, 157 insertions(+), 16 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index bf6a6bb69..27e6a49fb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -54,7 +54,10 @@ public class PipesConfigBase extends ConfigBase {
     private int numClients = DEFAULT_NUM_CLIENTS;
 
     private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;
-
+    public static final int DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS = 600;
+    private int staleFetcherTimeoutSeconds = DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS;
+    public static final int DEFAULT_STALE_FETCHER_DELAY_SECONDS = 60;
+    private int staleFetcherDelaySeconds = DEFAULT_STALE_FETCHER_DELAY_SECONDS;
     private List<String> forkedJvmArgs = new ArrayList<>();
     private Path tikaConfig;
     private String javaPath = "java";
@@ -171,4 +174,20 @@ public class PipesConfigBase extends ConfigBase {
     public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
         this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
     }
+
+    public int getStaleFetcherTimeoutSeconds() {
+        return staleFetcherTimeoutSeconds;
+    }
+
+    public void setStaleFetcherTimeoutSeconds(int staleFetcherTimeoutSeconds) {
+        this.staleFetcherTimeoutSeconds = staleFetcherTimeoutSeconds;
+    }
+
+    public int getStaleFetcherDelaySeconds() {
+        return staleFetcherDelaySeconds;
+    }
+
+    public void setStaleFetcherDelaySeconds(int staleFetcherDelaySeconds) {
+        this.staleFetcherDelaySeconds = staleFetcherDelaySeconds;
+    }
 }
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
new file mode 100644
index 000000000..e7c67e3d9
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
@@ -0,0 +1,81 @@
+package org.apache.tika.pipes.grpc;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class ExpiringFetcherStore {
+    private static final Logger LOG = LoggerFactory.getLogger(ExpiringFetcherStore.class);
+    public static final long EXPIRE_JOB_INITIAL_DELAY = 1L;
+    public static final long EXPIRE_JOB_PERIOD = 60L;
+
+    private final Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
+    private final Map<String, AbstractConfig> fetcherConfigs = Collections.synchronizedMap(new HashMap<>());
+    private final Map<String, Instant> fetcherLastAccessed =
+            Collections.synchronizedMap(new HashMap<>());
+
+    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
+    public ExpiringFetcherStore(int expireAfterSeconds, int checkForExpiredFetchersDelaySeconds) {
+        executorService.scheduleAtFixedRate(() -> {
+            Set<String> expired = new HashSet<>();
+            for (String fetcherName : fetchers.keySet()) {
+                Instant lastAccessed = fetcherLastAccessed.get(fetcherName);
+                if (lastAccessed == null) {
+                    LOG.error("Detected a fetcher with no last access time. FetcherName={}",
+                            fetcherName);
+                    expired.add(fetcherName);
+                } else if (Instant.now().isAfter(lastAccessed.plusSeconds(expireAfterSeconds))) {
+                    LOG.info("Detected stale fetcher {} hasn't been access in {} seconds. " +
+                                    "Deleting.",
+                            fetcherName, Instant.now().getEpochSecond() - lastAccessed.getEpochSecond());
+                    expired.add(fetcherName);
+                }
+            }
+            for (String expiredFetcherId : expired) {
+                deleteFetcher(expiredFetcherId);
+            }
+        }, EXPIRE_JOB_INITIAL_DELAY, checkForExpiredFetchersDelaySeconds, TimeUnit.SECONDS);
+    }
+    
+    public void deleteFetcher(String fetcherName) {
+        fetchers.remove(fetcherName);
+        fetcherConfigs.remove(fetcherName);
+        fetcherLastAccessed.remove(fetcherName);
+    }
+
+    public Map<String, AbstractFetcher> getFetchers() {
+        return fetchers;
+    }
+    
+    public Map<String, AbstractConfig> getFetcherConfigs() {
+        return fetcherConfigs;
+    }
+
+    /**
+     * This method will get the fetcher, but will also log the access the fetcher as having
+     * been accessed. This prevents the scheduled job from removing the stale fetcher.
+     */
+    public <T extends AbstractFetcher> T getFetcherAndLogAccess(String fetcherName) {
+        fetcherLastAccessed.put(fetcherName, Instant.now());
+        return (T) fetchers.get(fetcherName);
+    }
+    
+    public <T extends AbstractFetcher, C extends AbstractConfig> void createFetcher(T fetcher, C config) {
+        fetchers.put(fetcher.getName(), fetcher);
+        fetcherConfigs.put(fetcher.getName(), config);
+        getFetcherAndLogAccess(fetcher.getName());
+    }
+}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index c8e567460..dc1b0cba2 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -75,16 +74,18 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     /**
      * FetcherID is key, The pair is the Fetcher object and the Metadata
      */
-    Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
-    Map<String, AbstractConfig> fetcherConfigs = Collections.synchronizedMap(new HashMap<>());
     PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
     PipesClient pipesClient = new PipesClient(pipesConfig);
+    ExpiringFetcherStore expiringFetcherStore;
 
     String tikaConfigPath;
 
     TikaGrpcServerImpl(String tikaConfigPath)
             throws TikaConfigException, IOException, ParserConfigurationException,
             TransformerException, SAXException {
+        expiringFetcherStore =
+                new ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(),
+                        pipesConfig.getStaleFetcherDelaySeconds());
         this.tikaConfigPath = tikaConfigPath;
         updateTikaConfig();
     }
@@ -93,14 +94,15 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
             throws ParserConfigurationException, IOException, SAXException, TransformerException {
         Document tikaConfigDoc =
                 DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+
         Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
         for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
             fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
         }
-        for (var fetcherEntry : fetchers.entrySet()) {
+        for (var fetcherEntry : expiringFetcherStore.getFetchers().entrySet()) {
             AbstractFetcher fetcherObject = fetcherEntry.getValue();
             Map<String, Object> fetcherConfigParams =
-                    OBJECT_MAPPER.convertValue(fetcherConfigs.get(fetcherEntry.getKey()),
+                    OBJECT_MAPPER.convertValue(expiringFetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
                             new TypeReference<>() {
                             });
             Element fetcher = tikaConfigDoc.createElement("fetcher");
@@ -166,7 +168,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
 
     private void fetchAndParseImpl(FetchAndParseRequest request,
                                    StreamObserver<FetchAndParseReply> responseObserver) {
-        AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+        AbstractFetcher fetcher = expiringFetcherStore.getFetcherAndLogAccess(request.getFetcherName());
         if (fetcher == null) {
             throw new RuntimeException(
                     "Could not find fetcher with name " + request.getFetcherName());
@@ -233,8 +235,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
                 Initializable initializable = (Initializable) abstractFetcher;
                 initializable.initialize(tikaParamsMap);
             }
-            fetchers.put(name, abstractFetcher);
-            fetcherConfigs.put(name, configObject);
+            expiringFetcherStore.createFetcher(abstractFetcher, configObject);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
                  InvocationTargetException | NoSuchMethodException | TikaConfigException e) {
             throw new RuntimeException(e);
@@ -271,7 +272,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     public void getFetcher(GetFetcherRequest request,
                            StreamObserver<GetFetcherReply> responseObserver) {
         GetFetcherReply.Builder getFetcherReply = GetFetcherReply.newBuilder();
-        AbstractConfig abstractConfig = fetcherConfigs.get(request.getName());
+        AbstractConfig abstractConfig = expiringFetcherStore.getFetcherConfigs().get(request.getName());
         Map<String, Object> paramMap =
                 OBJECT_MAPPER.convertValue(abstractConfig, new TypeReference<>() {
                 });
@@ -285,7 +286,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
     public void listFetchers(ListFetchersRequest request,
                              StreamObserver<ListFetchersReply> responseObserver) {
         ListFetchersReply.Builder listFetchersReplyBuilder = ListFetchersReply.newBuilder();
-        for (Map.Entry<String, AbstractConfig> fetcherConfig : fetcherConfigs.entrySet()) {
+        for (Map.Entry<String, AbstractConfig> fetcherConfig : expiringFetcherStore.getFetcherConfigs().entrySet()) {
             GetFetcherReply.Builder replyBuilder = createFetcherReply(fetcherConfig);
             listFetchersReplyBuilder.addGetFetcherReply(replyBuilder.build());
         }
@@ -295,8 +296,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
 
     private GetFetcherReply.Builder createFetcherReply(
             Map.Entry<String, AbstractConfig> fetcherConfig) {
-        AbstractFetcher abstractFetcher = fetchers.get(fetcherConfig.getKey());
-        AbstractConfig abstractConfig = fetcherConfigs.get(fetcherConfig.getKey());
+        AbstractFetcher abstractFetcher = expiringFetcherStore.getFetchers().get(fetcherConfig.getKey());
+        AbstractConfig abstractConfig = expiringFetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
         GetFetcherReply.Builder replyBuilder =
                 GetFetcherReply.newBuilder().setFetcherClass(abstractFetcher.getClass().getName())
                         .setName(abstractFetcher.getName());
@@ -319,8 +320,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
         }
     }
 
-    private void deleteFetcher(String name) {
-        fetcherConfigs.remove(name);
-        fetchers.remove(name);
+    private void deleteFetcher(String fetcherName) {
+        expiringFetcherStore.deleteFetcher(fetcherName);
     }
 }
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
new file mode 100644
index 000000000..f407b77cb
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
@@ -0,0 +1,41 @@
+package org.apache.tika.pipes.grpc;
+
+import java.io.InputStream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+class ExpiringFetcherStoreTest {
+
+    @Test
+    void createFetcher() {
+        ExpiringFetcherStore expiringFetcherStore = new ExpiringFetcherStore(1, 60);
+
+        AbstractFetcher fetcher = new AbstractFetcher() {
+            @Override
+            public InputStream fetch(String fetchKey, Metadata metadata) {
+                return null;
+            }
+        };
+        fetcher.setName("nick");
+        AbstractConfig config = new AbstractConfig() {
+
+        };
+        expiringFetcherStore.createFetcher(fetcher, config);
+
+        Assertions.assertNotNull(expiringFetcherStore.getFetchers().get(fetcher.getName()));
+
+        try {
+            Thread.sleep(2000L);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+
+        Assertions.assertNull(expiringFetcherStore.getFetchers().get(fetcher.getName()));
+        Assertions.assertNull(expiringFetcherStore.getFetcherConfigs().get(fetcher.getName()));
+    }
+}