You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/03/29 11:33:03 UTC
(tika) branch TIKA-4181-grpc updated: make it as a stale connection expiring store
This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-4181-grpc by this push:
new 114260749 make it as a stale connection expiring store
114260749 is described below
commit 1142607490f8b40d2741ee85ef0d3d60316d7e5d
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Fri Mar 29 06:32:57 2024 -0500
make it as a stale connection expiring store
---
.../org/apache/tika/pipes/PipesConfigBase.java | 21 +++++-
.../tika/pipes/grpc/ExpiringFetcherStore.java | 81 ++++++++++++++++++++++
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 30 ++++----
.../tika/pipes/grpc/ExpiringFetcherStoreTest.java | 41 +++++++++++
4 files changed, 157 insertions(+), 16 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
index bf6a6bb69..27e6a49fb 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfigBase.java
@@ -54,7 +54,10 @@ public class PipesConfigBase extends ConfigBase {
private int numClients = DEFAULT_NUM_CLIENTS;
private int maxFilesProcessedPerProcess = DEFAULT_MAX_FILES_PROCESSED_PER_PROCESS;
-
+ public static final int DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS = 600;
+ private int staleFetcherTimeoutSeconds = DEFAULT_STALE_FETCHER_TIMEOUT_SECONDS;
+ public static final int DEFAULT_STALE_FETCHER_DELAY_SECONDS = 60;
+ private int staleFetcherDelaySeconds = DEFAULT_STALE_FETCHER_DELAY_SECONDS;
private List<String> forkedJvmArgs = new ArrayList<>();
private Path tikaConfig;
private String javaPath = "java";
@@ -171,4 +174,20 @@ public class PipesConfigBase extends ConfigBase {
public void setSleepOnStartupTimeoutMillis(long sleepOnStartupTimeoutMillis) {
this.sleepOnStartupTimeoutMillis = sleepOnStartupTimeoutMillis;
}
+
+ public int getStaleFetcherTimeoutSeconds() {
+ return staleFetcherTimeoutSeconds;
+ }
+
+ public void setStaleFetcherTimeoutSeconds(int staleFetcherTimeoutSeconds) {
+ this.staleFetcherTimeoutSeconds = staleFetcherTimeoutSeconds;
+ }
+
+ public int getStaleFetcherDelaySeconds() {
+ return staleFetcherDelaySeconds;
+ }
+
+ public void setStaleFetcherDelaySeconds(int staleFetcherDelaySeconds) {
+ this.staleFetcherDelaySeconds = staleFetcherDelaySeconds;
+ }
}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
new file mode 100644
index 000000000..e7c67e3d9
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/ExpiringFetcherStore.java
@@ -0,0 +1,81 @@
+package org.apache.tika.pipes.grpc;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class ExpiringFetcherStore {
+ private static final Logger LOG = LoggerFactory.getLogger(ExpiringFetcherStore.class);
+ public static final long EXPIRE_JOB_INITIAL_DELAY = 1L;
+ public static final long EXPIRE_JOB_PERIOD = 60L;
+
+ private final Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
+ private final Map<String, AbstractConfig> fetcherConfigs = Collections.synchronizedMap(new HashMap<>());
+ private final Map<String, Instant> fetcherLastAccessed =
+ Collections.synchronizedMap(new HashMap<>());
+
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
+ public ExpiringFetcherStore(int expireAfterSeconds, int checkForExpiredFetchersDelaySeconds) {
+ executorService.scheduleAtFixedRate(() -> {
+ Set<String> expired = new HashSet<>();
+ for (String fetcherName : fetchers.keySet()) {
+ Instant lastAccessed = fetcherLastAccessed.get(fetcherName);
+ if (lastAccessed == null) {
+ LOG.error("Detected a fetcher with no last access time. FetcherName={}",
+ fetcherName);
+ expired.add(fetcherName);
+ } else if (Instant.now().isAfter(lastAccessed.plusSeconds(expireAfterSeconds))) {
+ LOG.info("Detected stale fetcher {} hasn't been access in {} seconds. " +
+ "Deleting.",
+ fetcherName, Instant.now().getEpochSecond() - lastAccessed.getEpochSecond());
+ expired.add(fetcherName);
+ }
+ }
+ for (String expiredFetcherId : expired) {
+ deleteFetcher(expiredFetcherId);
+ }
+ }, EXPIRE_JOB_INITIAL_DELAY, checkForExpiredFetchersDelaySeconds, TimeUnit.SECONDS);
+ }
+
+ public void deleteFetcher(String fetcherName) {
+ fetchers.remove(fetcherName);
+ fetcherConfigs.remove(fetcherName);
+ fetcherLastAccessed.remove(fetcherName);
+ }
+
+ public Map<String, AbstractFetcher> getFetchers() {
+ return fetchers;
+ }
+
+ public Map<String, AbstractConfig> getFetcherConfigs() {
+ return fetcherConfigs;
+ }
+
+ /**
+ * This method will get the fetcher, but will also log the access the fetcher as having
+ * been accessed. This prevents the scheduled job from removing the stale fetcher.
+ */
+ public <T extends AbstractFetcher> T getFetcherAndLogAccess(String fetcherName) {
+ fetcherLastAccessed.put(fetcherName, Instant.now());
+ return (T) fetchers.get(fetcherName);
+ }
+
+ public <T extends AbstractFetcher, C extends AbstractConfig> void createFetcher(T fetcher, C config) {
+ fetchers.put(fetcher.getName(), fetcher);
+ fetcherConfigs.put(fetcher.getName(), config);
+ getFetcherAndLogAccess(fetcher.getName());
+ }
+}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
index c8e567460..dc1b0cba2 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -75,16 +74,18 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
/**
* FetcherID is key, The pair is the Fetcher object and the Metadata
*/
- Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
- Map<String, AbstractConfig> fetcherConfigs = Collections.synchronizedMap(new HashMap<>());
PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
PipesClient pipesClient = new PipesClient(pipesConfig);
+ ExpiringFetcherStore expiringFetcherStore;
String tikaConfigPath;
TikaGrpcServerImpl(String tikaConfigPath)
throws TikaConfigException, IOException, ParserConfigurationException,
TransformerException, SAXException {
+ expiringFetcherStore =
+ new ExpiringFetcherStore(pipesConfig.getStaleFetcherTimeoutSeconds(),
+ pipesConfig.getStaleFetcherDelaySeconds());
this.tikaConfigPath = tikaConfigPath;
updateTikaConfig();
}
@@ -93,14 +94,15 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
throws ParserConfigurationException, IOException, SAXException, TransformerException {
Document tikaConfigDoc =
DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+
Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
}
- for (var fetcherEntry : fetchers.entrySet()) {
+ for (var fetcherEntry : expiringFetcherStore.getFetchers().entrySet()) {
AbstractFetcher fetcherObject = fetcherEntry.getValue();
Map<String, Object> fetcherConfigParams =
- OBJECT_MAPPER.convertValue(fetcherConfigs.get(fetcherEntry.getKey()),
+ OBJECT_MAPPER.convertValue(expiringFetcherStore.getFetcherConfigs().get(fetcherEntry.getKey()),
new TypeReference<>() {
});
Element fetcher = tikaConfigDoc.createElement("fetcher");
@@ -166,7 +168,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
private void fetchAndParseImpl(FetchAndParseRequest request,
StreamObserver<FetchAndParseReply> responseObserver) {
- AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+ AbstractFetcher fetcher = expiringFetcherStore.getFetcherAndLogAccess(request.getFetcherName());
if (fetcher == null) {
throw new RuntimeException(
"Could not find fetcher with name " + request.getFetcherName());
@@ -233,8 +235,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
Initializable initializable = (Initializable) abstractFetcher;
initializable.initialize(tikaParamsMap);
}
- fetchers.put(name, abstractFetcher);
- fetcherConfigs.put(name, configObject);
+ expiringFetcherStore.createFetcher(abstractFetcher, configObject);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
InvocationTargetException | NoSuchMethodException | TikaConfigException e) {
throw new RuntimeException(e);
@@ -271,7 +272,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
public void getFetcher(GetFetcherRequest request,
StreamObserver<GetFetcherReply> responseObserver) {
GetFetcherReply.Builder getFetcherReply = GetFetcherReply.newBuilder();
- AbstractConfig abstractConfig = fetcherConfigs.get(request.getName());
+ AbstractConfig abstractConfig = expiringFetcherStore.getFetcherConfigs().get(request.getName());
Map<String, Object> paramMap =
OBJECT_MAPPER.convertValue(abstractConfig, new TypeReference<>() {
});
@@ -285,7 +286,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
public void listFetchers(ListFetchersRequest request,
StreamObserver<ListFetchersReply> responseObserver) {
ListFetchersReply.Builder listFetchersReplyBuilder = ListFetchersReply.newBuilder();
- for (Map.Entry<String, AbstractConfig> fetcherConfig : fetcherConfigs.entrySet()) {
+ for (Map.Entry<String, AbstractConfig> fetcherConfig : expiringFetcherStore.getFetcherConfigs().entrySet()) {
GetFetcherReply.Builder replyBuilder = createFetcherReply(fetcherConfig);
listFetchersReplyBuilder.addGetFetcherReply(replyBuilder.build());
}
@@ -295,8 +296,8 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
private GetFetcherReply.Builder createFetcherReply(
Map.Entry<String, AbstractConfig> fetcherConfig) {
- AbstractFetcher abstractFetcher = fetchers.get(fetcherConfig.getKey());
- AbstractConfig abstractConfig = fetcherConfigs.get(fetcherConfig.getKey());
+ AbstractFetcher abstractFetcher = expiringFetcherStore.getFetchers().get(fetcherConfig.getKey());
+ AbstractConfig abstractConfig = expiringFetcherStore.getFetcherConfigs().get(fetcherConfig.getKey());
GetFetcherReply.Builder replyBuilder =
GetFetcherReply.newBuilder().setFetcherClass(abstractFetcher.getClass().getName())
.setName(abstractFetcher.getName());
@@ -319,8 +320,7 @@ class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
}
}
- private void deleteFetcher(String name) {
- fetcherConfigs.remove(name);
- fetchers.remove(name);
+ private void deleteFetcher(String fetcherName) {
+ expiringFetcherStore.deleteFetcher(fetcherName);
}
}
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
new file mode 100644
index 000000000..f407b77cb
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/ExpiringFetcherStoreTest.java
@@ -0,0 +1,41 @@
+package org.apache.tika.pipes.grpc;
+
+import java.io.InputStream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+class ExpiringFetcherStoreTest {
+
+ @Test
+ void createFetcher() {
+ ExpiringFetcherStore expiringFetcherStore = new ExpiringFetcherStore(1, 60);
+
+ AbstractFetcher fetcher = new AbstractFetcher() {
+ @Override
+ public InputStream fetch(String fetchKey, Metadata metadata) {
+ return null;
+ }
+ };
+ fetcher.setName("nick");
+ AbstractConfig config = new AbstractConfig() {
+
+ };
+ expiringFetcherStore.createFetcher(fetcher, config);
+
+ Assertions.assertNotNull(expiringFetcherStore.getFetchers().get(fetcher.getName()));
+
+ try {
+ Thread.sleep(2000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ Assertions.assertNull(expiringFetcherStore.getFetchers().get(fetcher.getName()));
+ Assertions.assertNull(expiringFetcherStore.getFetcherConfigs().get(fetcher.getName()));
+ }
+}