You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/03/02 15:05:07 UTC
(tika) branch TIKA-4181-grpc updated: latest updates - wip
This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-4181-grpc by this push:
new 7b40aae09 latest updates - wip
7b40aae09 is described below
commit 7b40aae09bbfd63a209e89628578b7926db0a00b
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Sat Mar 2 09:04:56 2024 -0600
latest updates - wip
---
.../tika/pipes/fetcher/config/AbstractConfig.java | 4 +
.../tika/pipes/fetcher/fs/FileSystemFetcher.java | 8 +
.../fetcher/fs/config/FileSystemFetcherConfig.java | 26 ++
.../tika/pipes/fetcher/azblob/AZBlobFetcher.java | 11 +
.../fetcher/azblob/config/AZBlobFetcherConfig.java | 56 +++++
.../apache/tika/pipes/fetcher/gcs/GCSFetcher.java | 9 +
.../pipes/fetcher/gcs/config/GCSFetcherConfig.java | 46 ++++
.../tika/pipes/fetcher/http/HttpFetcher.java | 26 ++
.../fetcher/http/config/HttpFetcherConfig.java | 178 +++++++++++++
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 24 ++
.../pipes/fetcher/s3/config/S3FetcherConfig.java | 156 ++++++++++++
tika-pipes/tika-grpc/pom.xml | 38 ++-
.../org/apache/tika/pipes/grpc/TikaGrpcServer.java | 77 ++++++
.../apache/tika/pipes/grpc/TikaGrpcServerImpl.java | 278 +++++++++++++++++++++
.../org/apache/tika/pipes/grpc/TikaServer.java | 217 ----------------
tika-pipes/tika-grpc/src/main/proto/tika.proto | 49 +++-
.../org/apache/tika/pipes/grpc/TikaClient.java | 12 +-
.../apache/tika/pipes/grpc/TikaGrpcServerTest.java | 59 +++++
.../org/apache/tika/pipes/grpc/TikaServerTest.java | 59 -----
tika-pipes/tika-grpc/tika-config.xml | 4 -
20 files changed, 1038 insertions(+), 299 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/AbstractConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/AbstractConfig.java
new file mode 100644
index 000000000..536fc44b1
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/config/AbstractConfig.java
@@ -0,0 +1,4 @@
+package org.apache.tika.pipes.fetcher.config;
+
+public abstract class AbstractConfig {
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index d926e3ca6..a587fb0bd 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -42,8 +42,16 @@ import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.Property;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.fs.config.FileSystemFetcherConfig;
public class FileSystemFetcher extends AbstractFetcher implements Initializable {
+ public FileSystemFetcher() {
+ }
+
+ public FileSystemFetcher(FileSystemFetcherConfig fileSystemFetcherConfig) {
+ setBasePath(fileSystemFetcherConfig.getBasePath());
+ setExtractFileSystemMetadata(fileSystemFetcherConfig.isExtractFileSystemMetadata());
+ }
private static final Logger LOG = LoggerFactory.getLogger(FileSystemFetcher.class);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/config/FileSystemFetcherConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/config/FileSystemFetcherConfig.java
new file mode 100644
index 000000000..aa02cccae
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/config/FileSystemFetcherConfig.java
@@ -0,0 +1,26 @@
+package org.apache.tika.pipes.fetcher.fs.config;
+
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class FileSystemFetcherConfig extends AbstractConfig {
+ private String basePath;
+ private boolean extractFileSystemMetadata;
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public FileSystemFetcherConfig setBasePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public boolean isExtractFileSystemMetadata() {
+ return extractFileSystemMetadata;
+ }
+
+ public FileSystemFetcherConfig setExtractFileSystemMetadata(boolean extractFileSystemMetadata) {
+ this.extractFileSystemMetadata = extractFileSystemMetadata;
+ return this;
+ }
+}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
index dee903040..235789a61 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/AZBlobFetcher.java
@@ -44,6 +44,7 @@ import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.azblob.config.AZBlobFetcherConfig;
import org.apache.tika.utils.StringUtils;
/**
@@ -57,6 +58,16 @@ import org.apache.tika.utils.StringUtils;
* your requests, your fetchKey will be the complete SAS url pointing to the blob.
*/
public class AZBlobFetcher extends AbstractFetcher implements Initializable {
+ public AZBlobFetcher() {
+
+ }
+ public AZBlobFetcher(AZBlobFetcherConfig azBlobFetcherConfig) {
+ setContainer(azBlobFetcherConfig.getContainer());
+ setEndpoint(azBlobFetcherConfig.getEndpoint());
+ setSasToken(azBlobFetcherConfig.getSasToken());
+ setSpoolToTemp(azBlobFetcherConfig.isSpoolToTemp());
+ setExtractUserMetadata(azBlobFetcherConfig.isExtractUserMetadata());
+ }
private static String PREFIX = "az-blob";
private static final Logger LOGGER = LoggerFactory.getLogger(AZBlobFetcher.class);
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/config/AZBlobFetcherConfig.java b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/config/AZBlobFetcherConfig.java
new file mode 100644
index 000000000..5e64d85d0
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-az-blob/src/main/java/org/apache/tika/pipes/fetcher/azblob/config/AZBlobFetcherConfig.java
@@ -0,0 +1,56 @@
+package org.apache.tika.pipes.fetcher.azblob.config;
+
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class AZBlobFetcherConfig extends AbstractConfig {
+ private boolean spoolToTemp;
+ private String sasToken;
+ private String endpoint;
+ private String container;
+ private boolean extractUserMetadata;
+
+ public boolean isSpoolToTemp() {
+ return spoolToTemp;
+ }
+
+ public AZBlobFetcherConfig setSpoolToTemp(boolean spoolToTemp) {
+ this.spoolToTemp = spoolToTemp;
+ return this;
+ }
+
+ public String getSasToken() {
+ return sasToken;
+ }
+
+ public AZBlobFetcherConfig setSasToken(String sasToken) {
+ this.sasToken = sasToken;
+ return this;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public AZBlobFetcherConfig setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ public String getContainer() {
+ return container;
+ }
+
+ public AZBlobFetcherConfig setContainer(String container) {
+ this.container = container;
+ return this;
+ }
+
+ public boolean isExtractUserMetadata() {
+ return extractUserMetadata;
+ }
+
+ public AZBlobFetcherConfig setExtractUserMetadata(boolean extractUserMetadata) {
+ this.extractUserMetadata = extractUserMetadata;
+ return this;
+ }
+}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
index 6881c5a66..747fde691 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/GCSFetcher.java
@@ -40,12 +40,21 @@ import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.gcs.config.GCSFetcherConfig;
/**
* Fetches files from google cloud storage. Must set projectId and bucket via the config.
*/
public class GCSFetcher extends AbstractFetcher implements Initializable {
+ public GCSFetcher() {
+ }
+ public GCSFetcher(GCSFetcherConfig gcsFetcherConfig) {
+ setBucket(gcsFetcherConfig.getBucket());
+ setProjectId(gcsFetcherConfig.getProjectId());
+ setSpoolToTemp(gcsFetcherConfig.isSpoolToTemp());
+ setExtractUserMetadata(gcsFetcherConfig.isExtractUserMetadata());
+ }
private static String PREFIX = "gcs";
private static final Logger LOGGER = LoggerFactory.getLogger(GCSFetcher.class);
private String projectId;
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/config/GCSFetcherConfig.java b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/config/GCSFetcherConfig.java
new file mode 100644
index 000000000..9988ceedd
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-gcs/src/main/java/org/apache/tika/pipes/fetcher/gcs/config/GCSFetcherConfig.java
@@ -0,0 +1,46 @@
+package org.apache.tika.pipes.fetcher.gcs.config;
+
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class GCSFetcherConfig extends AbstractConfig {
+ private boolean spoolToTemp;
+ private String projectId;
+ private String bucket;
+ private boolean extractUserMetadata;
+
+ public boolean isSpoolToTemp() {
+ return spoolToTemp;
+ }
+
+ public GCSFetcherConfig setSpoolToTemp(boolean spoolToTemp) {
+ this.spoolToTemp = spoolToTemp;
+ return this;
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ public GCSFetcherConfig setProjectId(String projectId) {
+ this.projectId = projectId;
+ return this;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public GCSFetcherConfig setBucket(String bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public boolean isExtractUserMetadata() {
+ return extractUserMetadata;
+ }
+
+ public GCSFetcherConfig setExtractUserMetadata(boolean extractUserMetadata) {
+ this.extractUserMetadata = extractUserMetadata;
+ return this;
+ }
+}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index 35e6f3e82..0645be4ff 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -69,13 +69,39 @@ import org.apache.tika.metadata.Property;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
import org.apache.tika.pipes.fetcher.RangeFetcher;
+import org.apache.tika.pipes.fetcher.http.config.HttpFetcherConfig;
import org.apache.tika.utils.StringUtils;
/**
* Based on Apache httpclient
*/
public class HttpFetcher extends AbstractFetcher implements Initializable, RangeFetcher {
+ public HttpFetcher() {
+ }
+ public HttpFetcher(HttpFetcherConfig httpFetcherConfig) {
+ setConnectTimeout(httpFetcherConfig.getConnectTimeout());
+ setRequestTimeout(httpFetcherConfig.getRequestTimeout());
+ setSocketTimeout(httpFetcherConfig.getSocketTimeout());
+ setOverallTimeout(httpFetcherConfig.getOverallTimeout());
+
+ setMaxErrMsgSize(httpFetcherConfig.getMaxErrMsgSize());
+ setMaxConnections(httpFetcherConfig.getMaxConnections());
+ setMaxConnectionsPerRoute(httpFetcherConfig.getMaxConnectionsPerRoute());
+ setMaxRedirects(httpFetcherConfig.getMaxRedirects());
+ setMaxSpoolSize(httpFetcherConfig.getMaxSpoolSize());
+
+ setHttpHeaders(httpFetcherConfig.getHeaders());
+ setUserAgent(httpFetcherConfig.getUserAgent());
+
+ setUserName(httpFetcherConfig.getUserName());
+ setPassword(httpFetcherConfig.getPassword());
+ setNtDomain(httpFetcherConfig.getNtDomain());
+ setAuthScheme(httpFetcherConfig.getAuthScheme());
+
+ setProxyHost(httpFetcherConfig.getProxyHost());
+ setProxyPort(httpFetcherConfig.getProxyPort());
+ }
public static String HTTP_HEADER_PREFIX = "http-header:";
public static String HTTP_FETCH_PREFIX = "http-connection:";
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/config/HttpFetcherConfig.java b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/config/HttpFetcherConfig.java
new file mode 100644
index 000000000..1372f1355
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/config/HttpFetcherConfig.java
@@ -0,0 +1,178 @@
+package org.apache.tika.pipes.fetcher.http.config;
+
+import java.util.List;
+
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class HttpFetcherConfig extends AbstractConfig {
+ private String userName;
+ private String password;
+ private String ntDomain;
+ private String authScheme;
+ private String proxyHost;
+ private int proxyPort;
+ private int connectTimeout;
+ private int requestTimeout;
+ private int socketTimeout;
+ private int maxConnections;
+ int maxConnectionsPerRoute;
+ private long maxSpoolSize;
+ private int maxRedirects;
+ private List<String> headers;
+ private long overallTimeout;
+ private int maxErrMsgSize;
+ private String userAgent;
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public HttpFetcherConfig setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public HttpFetcherConfig setPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public String getNtDomain() {
+ return ntDomain;
+ }
+
+ public HttpFetcherConfig setNtDomain(String ntDomain) {
+ this.ntDomain = ntDomain;
+ return this;
+ }
+
+ public String getAuthScheme() {
+ return authScheme;
+ }
+
+ public HttpFetcherConfig setAuthScheme(String authScheme) {
+ this.authScheme = authScheme;
+ return this;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public HttpFetcherConfig setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ return this;
+ }
+
+ public int getProxyPort() {
+ return proxyPort;
+ }
+
+ public HttpFetcherConfig setProxyPort(int proxyPort) {
+ this.proxyPort = proxyPort;
+ return this;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public HttpFetcherConfig setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ public int getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public HttpFetcherConfig setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ return this;
+ }
+
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public HttpFetcherConfig setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ return this;
+ }
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ public HttpFetcherConfig setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ return this;
+ }
+
+ public int getMaxConnectionsPerRoute() {
+ return maxConnectionsPerRoute;
+ }
+
+ public HttpFetcherConfig setMaxConnectionsPerRoute(int maxConnectionsPerRoute) {
+ this.maxConnectionsPerRoute = maxConnectionsPerRoute;
+ return this;
+ }
+
+ public long getMaxSpoolSize() {
+ return maxSpoolSize;
+ }
+
+ public HttpFetcherConfig setMaxSpoolSize(long maxSpoolSize) {
+ this.maxSpoolSize = maxSpoolSize;
+ return this;
+ }
+
+ public int getMaxRedirects() {
+ return maxRedirects;
+ }
+
+ public HttpFetcherConfig setMaxRedirects(int maxRedirects) {
+ this.maxRedirects = maxRedirects;
+ return this;
+ }
+
+ public List<String> getHeaders() {
+ return headers;
+ }
+
+ public HttpFetcherConfig setHeaders(List<String> headers) {
+ this.headers = headers;
+ return this;
+ }
+
+ public long getOverallTimeout() {
+ return overallTimeout;
+ }
+
+ public HttpFetcherConfig setOverallTimeout(long overallTimeout) {
+ this.overallTimeout = overallTimeout;
+ return this;
+ }
+
+ public int getMaxErrMsgSize() {
+ return maxErrMsgSize;
+ }
+
+ public HttpFetcherConfig setMaxErrMsgSize(int maxErrMsgSize) {
+ this.maxErrMsgSize = maxErrMsgSize;
+ return this;
+ }
+
+ public String getUserAgent() {
+ return userAgent;
+ }
+
+ public HttpFetcherConfig setUserAgent(String userAgent) {
+ this.userAgent = userAgent;
+ return this;
+ }
+}
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index b57c361b9..62eb4ec6c 100644
--- a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -57,6 +57,7 @@ import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
import org.apache.tika.pipes.fetcher.RangeFetcher;
+import org.apache.tika.pipes.fetcher.s3.config.S3FetcherConfig;
import org.apache.tika.utils.StringUtils;
/**
@@ -65,6 +66,29 @@ import org.apache.tika.utils.StringUtils;
* initialization, and the fetch key is "path/to/my_file.pdf".
*/
public class S3Fetcher extends AbstractFetcher implements Initializable, RangeFetcher {
+ public S3Fetcher() {
+
+ }
+
+ public S3Fetcher(S3FetcherConfig s3FetcherConfig) {
+ setBucket(s3FetcherConfig.getBucket());
+ setRegion(s3FetcherConfig.getRegion());
+ setProfile(s3FetcherConfig.getProfile());
+ setAccessKey(s3FetcherConfig.getAccessKey());
+ setSecretKey(s3FetcherConfig.getSecretKey());
+ setPrefix(s3FetcherConfig.getPrefix());
+
+ setCredentialsProvider(s3FetcherConfig.getCredentialsProvider());
+ setEndpointConfigurationService(s3FetcherConfig.getEndpointConfigurationService());
+
+ setMaxConnections(s3FetcherConfig.getMaxConnections());
+ setSpoolToTemp(s3FetcherConfig.isSpoolToTemp());
+ setThrottleSeconds(s3FetcherConfig.getThrottleSeconds());
+ setMaxLength(s3FetcherConfig.getMaxLength());
+
+ setExtractUserMetadata(s3FetcherConfig.isExtractUserMetadata());
+ setPathStyleAccessEnabled(s3FetcherConfig.isPathStyleAccessEnabled());
+ }
private static final Logger LOGGER = LoggerFactory.getLogger(S3Fetcher.class);
private static final String PREFIX = "s3";
diff --git a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/config/S3FetcherConfig.java b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/config/S3FetcherConfig.java
new file mode 100644
index 000000000..8a91691e2
--- /dev/null
+++ b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/config/S3FetcherConfig.java
@@ -0,0 +1,156 @@
+package org.apache.tika.pipes.fetcher.s3.config;
+
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+public class S3FetcherConfig extends AbstractConfig {
+ private boolean spoolToTemp;
+ private String region;
+ private String profile;
+ private String bucket;
+ private String commaDelimitedLongs;
+ private String prefix;
+ private boolean extractUserMetadata;
+ private int maxConnections;
+ private String credentialsProvider;
+ private long maxLength;
+ private String accessKey;
+ private String secretKey;
+ private String endpointConfigurationService;
+ private boolean pathStyleAccessEnabled;
+ private long[] throttleSeconds;
+
+ public boolean isSpoolToTemp() {
+ return spoolToTemp;
+ }
+
+ public S3FetcherConfig setSpoolToTemp(boolean spoolToTemp) {
+ this.spoolToTemp = spoolToTemp;
+ return this;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public S3FetcherConfig setRegion(String region) {
+ this.region = region;
+ return this;
+ }
+
+ public String getProfile() {
+ return profile;
+ }
+
+ public S3FetcherConfig setProfile(String profile) {
+ this.profile = profile;
+ return this;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public S3FetcherConfig setBucket(String bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public String getCommaDelimitedLongs() {
+ return commaDelimitedLongs;
+ }
+
+ public S3FetcherConfig setCommaDelimitedLongs(String commaDelimitedLongs) {
+ this.commaDelimitedLongs = commaDelimitedLongs;
+ return this;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public S3FetcherConfig setPrefix(String prefix) {
+ this.prefix = prefix;
+ return this;
+ }
+
+ public boolean isExtractUserMetadata() {
+ return extractUserMetadata;
+ }
+
+ public S3FetcherConfig setExtractUserMetadata(boolean extractUserMetadata) {
+ this.extractUserMetadata = extractUserMetadata;
+ return this;
+ }
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ public S3FetcherConfig setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ return this;
+ }
+
+ public String getCredentialsProvider() {
+ return credentialsProvider;
+ }
+
+ public S3FetcherConfig setCredentialsProvider(String credentialsProvider) {
+ this.credentialsProvider = credentialsProvider;
+ return this;
+ }
+
+ public long getMaxLength() {
+ return maxLength;
+ }
+
+ public S3FetcherConfig setMaxLength(long maxLength) {
+ this.maxLength = maxLength;
+ return this;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public S3FetcherConfig setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ return this;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public S3FetcherConfig setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ return this;
+ }
+
+ public String getEndpointConfigurationService() {
+ return endpointConfigurationService;
+ }
+
+ public S3FetcherConfig setEndpointConfigurationService(String endpointConfigurationService) {
+ this.endpointConfigurationService = endpointConfigurationService;
+ return this;
+ }
+
+ public boolean isPathStyleAccessEnabled() {
+ return pathStyleAccessEnabled;
+ }
+
+ public S3FetcherConfig setPathStyleAccessEnabled(boolean pathStyleAccessEnabled) {
+ this.pathStyleAccessEnabled = pathStyleAccessEnabled;
+ return this;
+ }
+
+ public long[] getThrottleSeconds() {
+ return throttleSeconds;
+ }
+
+ public S3FetcherConfig setThrottleSeconds(long[] throttleSeconds) {
+ this.throttleSeconds = throttleSeconds;
+ return this;
+ }
+}
diff --git a/tika-pipes/tika-grpc/pom.xml b/tika-pipes/tika-grpc/pom.xml
index 121baff41..a4d7486f8 100644
--- a/tika-pipes/tika-grpc/pom.xml
+++ b/tika-pipes/tika-grpc/pom.xml
@@ -5,8 +5,7 @@
<packaging>jar</packaging>
<!-- Feel free to delete the comment at the end of these lines. It is just
for safely updating the version in our release process. -->
- <version>1.60.0</version><!-- CURRENT_GRPC_VERSION -->
- <name>Apache Tika Pipes GRPC Server</name>
+ <name>Apache Tika pipes gRPC server</name>
<url>https://tika.apache.org/</url>
<parent>
@@ -21,9 +20,7 @@
<grpc.version>1.60.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.24.0</protobuf.version>
<protoc.version>3.24.0</protoc.version>
- <!-- required for JDK 8 -->
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
+ <maven.compiler.release>11</maven.compiler.release>
</properties>
<dependencyManagement>
@@ -76,21 +73,20 @@
<artifactId>j2objc-annotations</artifactId>
<version>2.8</version> <!-- prevent downgrade of version in guava -->
</dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-pipes -->
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-async-cli</artifactId>
- <version>2.9.1</version>
+ <version>3.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-parsers-standard-package</artifactId>
- <version>2.9.1</version>
+ <version>3.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
- <version>2.9.1</version>
+ <version>3.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
@@ -115,6 +111,30 @@
<version>3.4.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-fetcher-http</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-fetcher-gcs</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-fetcher-az-blob</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-fetcher-s3</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java
new file mode 100644
index 000000000..26493a065
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServer.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.Grpc;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Server that manages startup/shutdown of the GRPC Tika server.
+ */
+public class TikaGrpcServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TikaGrpcServer.class);
+ private Server server;
+ private static String tikaConfigPath;
+
+ private void start() throws Exception {
+ /* The port on which the server should run */
+ int port = Integer.parseInt(System.getProperty("server.port", "50051"));
+ server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
+ .addService(new TikaGrpcServerImpl(tikaConfigPath)).build().start();
+ LOGGER.info("Server started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+ System.err.println("*** shutting down gRPC server since JVM is shutting down");
+ try {
+ TikaGrpcServer.this.stop();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ System.err.println("*** server shut down");
+ }));
+ }
+
+ private void stop() throws InterruptedException {
+ if (server != null) {
+ server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ /**
+ * Main launches the server from the command line.
+ */
+ public static void main(String[] args) throws Exception {
+ tikaConfigPath = args[0];
+ final TikaGrpcServer server = new TikaGrpcServer();
+ server.start();
+ server.blockUntilShutdown();
+ }
+}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
new file mode 100644
index 000000000..697ebcac5
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaGrpcServerImpl.java
@@ -0,0 +1,278 @@
+package org.apache.tika.pipes.grpc;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.grpc.stub.StreamObserver;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.DeleteFetcherReply;
+import org.apache.tika.DeleteFetcherRequest;
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
+import org.apache.tika.GetFetcherReply;
+import org.apache.tika.GetFetcherRequest;
+import org.apache.tika.ListFetchersReply;
+import org.apache.tika.ListFetchersRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.UpdateFetcherReply;
+import org.apache.tika.UpdateFetcherRequest;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesClient;
+import org.apache.tika.pipes.PipesConfig;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.config.AbstractConfig;
+
+class TikaGrpcServerImpl extends TikaGrpc.TikaImplBase {
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ /**
+ * FetcherID is key, The pair is the Fetcher object and the Metadata
+ */
+ Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
+ Map<String, AbstractConfig> fetcherConfigs = Collections.synchronizedMap(new HashMap<>());
+ PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
+ PipesClient pipesClient = new PipesClient(pipesConfig);
+
+ String tikaConfigPath;
+
+ TikaGrpcServerImpl(String tikaConfigPath)
+ throws TikaConfigException, IOException, ParserConfigurationException,
+ TransformerException, SAXException {
+ this.tikaConfigPath = tikaConfigPath;
+ updateTikaConfig();
+ }
+
+ private void updateTikaConfig()
+ throws ParserConfigurationException, IOException, SAXException, TransformerException {
+ Document tikaConfigDoc =
+ DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+ Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
+ for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
+ fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
+ }
+ for (var fetcherEntry : fetchers.entrySet()) {
+ AbstractFetcher fetcherObject = fetcherEntry.getValue();
+ Map<String, Object> fetcherConfigParams =
+ OBJECT_MAPPER.convertValue(fetcherConfigs.get(fetcherEntry.getKey()), new TypeReference<>() {
+ });
+ Element fetcher = tikaConfigDoc.createElement("fetcher");
+ fetcher.setAttribute("class", fetcherEntry.getValue().getClass().getName());
+ Element fetcherName = tikaConfigDoc.createElement("name");
+ fetcherName.setTextContent(fetcherObject.getName());
+ fetcher.appendChild(fetcherName);
+ populateFetcherConfigs(fetcherConfigParams, tikaConfigDoc, fetcher);
+ fetchersElement.appendChild(fetcher);
+ }
+ DOMSource source = new DOMSource(tikaConfigDoc);
+ FileWriter writer = new FileWriter(tikaConfigPath, StandardCharsets.UTF_8);
+ StreamResult result = new StreamResult(writer);
+
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.transform(source, result);
+ }
+
+ private void populateFetcherConfigs(Map<String, Object> fetcherConfigParams,
+ Document tikaConfigDoc, Element fetcher) {
+ for (var configParam : fetcherConfigParams.entrySet()) {
+ Element configElm = tikaConfigDoc.createElement(configParam.getKey());
+ fetcher.appendChild(configElm);
+ configElm.setTextContent(Objects.toString(configParam.getValue()));
+ }
+ }
+
+ @SuppressWarnings("raw")
+ @Override
+ public void createFetcher(CreateFetcherRequest request,
+ StreamObserver<CreateFetcherReply> responseObserver) {
+ CreateFetcherReply reply =
+ CreateFetcherReply.newBuilder().setMessage(request.getName()).build();
+ Map<String, Param> tikaParamsMap = createTikaParamMap(request.getParamsMap());
+ try {
+ createFetcher(request.getName(), request.getFetcherClass(), request.getParamsMap(),
+ tikaParamsMap);
+ updateTikaConfig();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ private void createFetcher(String name, String fetcherClassName, Map<String, String> paramsMap,
+ Map<String, Param> tikaParamsMap) {
+ try {
+ Class<? extends AbstractFetcher> fetcherClass =
+ (Class<? extends AbstractFetcher>) Class.forName(fetcherClassName);
+ String configClassName =
+ fetcherClass.getPackageName() + ".config." + fetcherClass.getSimpleName() +
+ "Config";
+ Class<? extends AbstractConfig> configClass =
+ (Class<? extends AbstractConfig>) Class.forName(configClassName);
+ AbstractConfig configObject = OBJECT_MAPPER.convertValue(paramsMap, configClass);
+ AbstractFetcher abstractFetcher =
+ fetcherClass.getDeclaredConstructor(configClass).newInstance(configObject);
+ abstractFetcher.setName(name);
+ if (Initializable.class.isAssignableFrom(fetcherClass)) {
+ Initializable initializable = (Initializable) abstractFetcher;
+ initializable.initialize(tikaParamsMap);
+ }
+ fetchers.put(name, abstractFetcher);
+ fetcherConfigs.put(name, configObject);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ } catch (InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ } catch (TikaConfigException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Map<String, Param> createTikaParamMap(Map<String, String> paramsMap) {
+ Map<String, Param> tikaParamsMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : paramsMap.entrySet()) {
+ tikaParamsMap.put(entry.getKey(), new Param<>(entry.getKey(), entry.getValue()));
+ }
+ return tikaParamsMap;
+ }
+
+ @Override
+ public void fetchAndParse(FetchAndParseRequest request,
+ StreamObserver<FetchAndParseReply> responseObserver) {
+ AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+ if (fetcher == null) {
+ throw new RuntimeException(
+ "Could not find fetcher with name " + request.getFetcherName());
+ }
+ Metadata tikaMetadata = new Metadata();
+ for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
+ tikaMetadata.add(entry.getKey(), entry.getValue());
+ }
+ try {
+ PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
+ new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(),
+ FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
+ FetchAndParseReply.Builder fetchReplyBuilder =
+ FetchAndParseReply.newBuilder().setFetchKey(request.getFetchKey());
+ for (String name : metadata.names()) {
+ String value = metadata.get(name);
+ if (value != null) {
+ fetchReplyBuilder.putFields(name, value);
+ }
+ }
+ responseObserver.onNext(fetchReplyBuilder.build());
+ }
+ responseObserver.onCompleted();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void updateFetcher(UpdateFetcherRequest request,
+ StreamObserver<UpdateFetcherReply> responseObserver) {
+ UpdateFetcherReply reply =
+ UpdateFetcherReply.newBuilder().setMessage(request.getName()).build();
+ Map<String, Param> tikaParamsMap = createTikaParamMap(request.getParamsMap());
+ try {
+ deleteFetcher(request.getName());
+ createFetcher(request.getName(), request.getFetcherClass(), request.getParamsMap(),
+ tikaParamsMap);
+ updateTikaConfig();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getFetcher(GetFetcherRequest request,
+ StreamObserver<GetFetcherReply> responseObserver) {
+ GetFetcherReply.Builder getFetcherReply = GetFetcherReply.newBuilder();
+ AbstractConfig abstractConfig = fetcherConfigs.get(request.getName());
+ Map<String, Object> paramMap =
+ OBJECT_MAPPER.convertValue(abstractConfig, new TypeReference<>() {
+ });
+ paramMap.forEach((k, v) -> getFetcherReply.putParams(Objects.toString(k), Objects.toString(v)));
+ responseObserver.onNext(getFetcherReply.build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void listFetchers(ListFetchersRequest request,
+ StreamObserver<ListFetchersReply> responseObserver) {
+ ListFetchersReply.Builder listFetchersReplyBuilder = ListFetchersReply.newBuilder();
+ for (Map.Entry<String, AbstractConfig> fetcherConfig : fetcherConfigs.entrySet()) {
+ GetFetcherReply.Builder replyBuilder = createFetcherReply(fetcherConfig);
+ listFetchersReplyBuilder.addGetFetcherReply(replyBuilder.build());
+ }
+ responseObserver.onNext(listFetchersReplyBuilder.build());
+ responseObserver.onCompleted();
+ }
+
+ private GetFetcherReply.Builder createFetcherReply(Map.Entry<String, AbstractConfig> fetcherConfig) {
+ AbstractFetcher abstractFetcher = fetchers.get(fetcherConfig.getKey());
+ AbstractConfig abstractConfig = fetcherConfigs.get(fetcherConfig.getKey());
+ GetFetcherReply.Builder replyBuilder = GetFetcherReply.newBuilder()
+ .setFetcherClass(abstractFetcher.getClass().getName())
+ .setName(abstractFetcher.getName());
+ Map<String, Object> paramMap =
+ OBJECT_MAPPER.convertValue(abstractConfig, new TypeReference<>() {
+ });
+ paramMap.forEach((k, v) -> replyBuilder.putParams(Objects.toString(k), Objects.toString(v)));
+ return replyBuilder;
+ }
+
+ @Override
+ public void deleteFetcher(DeleteFetcherRequest request,
+ StreamObserver<DeleteFetcherReply> responseObserver) {
+ deleteFetcher(request.getName());
+ try {
+ updateTikaConfig();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void deleteFetcher(String name) {
+ fetcherConfigs.remove(name);
+ fetchers.remove(name);
+ }
+}
\ No newline at end of file
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
deleted file mode 100644
index 218f81d2c..000000000
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Copyright 2015 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tika.pipes.grpc;
-
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import io.grpc.Grpc;
-import io.grpc.InsecureServerCredentials;
-import io.grpc.Server;
-import io.grpc.stub.StreamObserver;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.SAXException;
-
-import org.apache.tika.CreateFetcherReply;
-import org.apache.tika.CreateFetcherRequest;
-import org.apache.tika.FetchReply;
-import org.apache.tika.FetchRequest;
-import org.apache.tika.TikaGrpc;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.PipesClient;
-import org.apache.tika.pipes.PipesConfig;
-import org.apache.tika.pipes.PipesResult;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
-
-/**
- * Server that manages startup/shutdown of a server.
- */
-public class TikaServer {
- private static final Logger logger = Logger.getLogger(TikaServer.class.getName());
- private Server server;
-
- private static String tikaConfigPath;
-
- private void start() throws Exception {
- /* The port on which the server should run */
- int port = 50051;
- server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
- .addService(new TikaServerImpl()).build().start();
- logger.info("Server started, listening on " + port);
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- System.err.println("*** shutting down gRPC server since JVM is shutting down");
- try {
- TikaServer.this.stop();
- } catch (InterruptedException e) {
- e.printStackTrace(System.err);
- }
- System.err.println("*** server shut down");
- }));
- }
-
- private void stop() throws InterruptedException {
- if (server != null) {
- server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
- }
- }
-
- /**
- * Await termination on the main thread since the grpc library uses daemon threads.
- */
- private void blockUntilShutdown() throws InterruptedException {
- if (server != null) {
- server.awaitTermination();
- }
- }
-
- /**
- * Main launches the server from the command line.
- */
- public static void main(String[] args) throws Exception {
- tikaConfigPath = args[0];
- final TikaServer server = new TikaServer();
- server.start();
- server.blockUntilShutdown();
- }
-
- static class TikaServerImpl extends TikaGrpc.TikaImplBase {
- Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
- PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
- PipesClient pipesClient = new PipesClient(pipesConfig);
-
- TikaServerImpl() throws TikaConfigException, IOException {
- }
-
- private void updateTikaConfig()
- throws ParserConfigurationException, IOException, SAXException,
- TransformerException {
- Document tikaConfigDoc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
- Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
- for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
- fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
- }
- for (Map.Entry<String, AbstractFetcher> fetcherEntry : fetchers.entrySet()) {
- Element fetcher = tikaConfigDoc.createElement("fetcher");
- fetcher.setAttribute("class", fetcherEntry.getValue().getClass().getName());
- if (fetcherEntry.getValue() instanceof FileSystemFetcher) {
- FileSystemFetcher fileSystemFetcher = (FileSystemFetcher) fetcherEntry.getValue();
- Element fetcherName = tikaConfigDoc.createElement("name");
- fetcherName.setTextContent(fileSystemFetcher.getName());
- fetcher.appendChild(fetcherName);
- Element basePath = tikaConfigDoc.createElement("basePath");
- fetcher.appendChild(basePath);
- basePath.setTextContent(fileSystemFetcher.getBasePath().toAbsolutePath().toString());
- }
- fetchersElement.appendChild(fetcher);
- }
- DOMSource source = new DOMSource(tikaConfigDoc);
- FileWriter writer = new FileWriter(tikaConfigPath, StandardCharsets.UTF_8);
- StreamResult result = new StreamResult(writer);
-
- TransformerFactory transformerFactory = TransformerFactory.newInstance();
- Transformer transformer = transformerFactory.newTransformer();
- transformer.transform(source, result);
- }
-
- @Override
- public void createFetcher(CreateFetcherRequest request,
- StreamObserver<CreateFetcherReply> responseObserver) {
- CreateFetcherReply reply =
- CreateFetcherReply.newBuilder().setMessage(request.getName()).build();
- if (FileSystemFetcher.class.getName().equals(request.getFetcherClass())) {
- FileSystemFetcher fileSystemFetcher = new FileSystemFetcher();
- fileSystemFetcher.setName(request.getName());
- fileSystemFetcher.setBasePath(request.getParamsOrDefault("basePath", "."));
- fileSystemFetcher.setExtractFileSystemMetadata(Boolean.parseBoolean(request.getParamsOrDefault("extractFileSystemMetadata", "false")));
- Map<String, String> paramsMap = request.getParamsMap();
- Map<String, Param> tikaParamsMap = new HashMap<>();
- for (Map.Entry<String, String> entry : paramsMap.entrySet()) {
- tikaParamsMap.put(entry.getKey(),
- new Param<>(entry.getKey(), entry.getValue()));
- }
- try {
- fileSystemFetcher.initialize(tikaParamsMap);
- } catch (TikaConfigException e) {
- throw new RuntimeException(e);
- }
- fetchers.put(request.getName(), fileSystemFetcher);
- }
- try {
- updateTikaConfig();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- responseObserver.onNext(reply);
- responseObserver.onCompleted();
- }
-
- @Override
- public void fetch(FetchRequest request, StreamObserver<FetchReply> responseObserver) {
- AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
- if (fetcher == null) {
- throw new RuntimeException("Could not find fetcher with name " + request.getFetcherName());
- }
- Metadata tikaMetadata = new Metadata();
- for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
- tikaMetadata.add(entry.getKey(), entry.getValue());
- }
- try {
- PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
- new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
- for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
- FetchReply.Builder fetchReplyBuilder = FetchReply.newBuilder()
- .setFetchKey(request.getFetchKey());
- for (String name : metadata.names()) {
- String value = metadata.get(name);
- if (value != null) {
- fetchReplyBuilder.putFields(name, value);
- }
- }
- responseObserver.onNext(fetchReplyBuilder.build());
- }
- responseObserver.onCompleted();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-}
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
index f2b350f5e..c23c678d4 100644
--- a/tika-pipes/tika-grpc/src/main/proto/tika.proto
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -21,8 +21,12 @@ option objc_class_prefix = "HLW";
package tika;
service Tika {
- rpc CreateFetcher (CreateFetcherRequest) returns (CreateFetcherReply) {}
- rpc Fetch (FetchRequest) returns (FetchReply) {}
+ rpc CreateFetcher(CreateFetcherRequest) returns (CreateFetcherReply) {}
+ rpc UpdateFetcher(UpdateFetcherRequest) returns (UpdateFetcherReply) {}
+ rpc GetFetcher(GetFetcherRequest) returns (GetFetcherReply) {}
+ rpc ListFetchers(ListFetchersRequest) returns (ListFetchersReply) {}
+ rpc DeleteFetcher(DeleteFetcherRequest) returns (DeleteFetcherReply) {}
+ rpc FetchAndParse(FetchAndParseRequest) returns (FetchAndParseReply) {}
}
message CreateFetcherRequest {
@@ -35,13 +39,50 @@ message CreateFetcherReply {
string message = 1;
}
-message FetchRequest {
+message UpdateFetcherRequest {
+ string name = 1;
+ string fetcherClass = 2;
+ map<string,string> params = 3;
+}
+
+message UpdateFetcherReply {
+ string message = 1;
+}
+
+message FetchAndParseRequest {
string fetcherName = 1;
string fetchKey = 2;
map<string,string> metadata = 3;
}
-message FetchReply {
+message FetchAndParseReply {
string fetchKey = 1;
map<string,string> fields = 2;
}
+
+message DeleteFetcherRequest {
+ string name = 1;
+}
+
+message DeleteFetcherReply {
+ bool success = 1;
+}
+
+message GetFetcherRequest {
+ string name = 1;
+}
+
+message GetFetcherReply {
+ string name = 1;
+ string fetcherClass = 2;
+ map<string,string> params = 3;
+}
+
+message ListFetchersRequest {
+ int32 pageNumber = 1;
+ int32 numFetchersPerPage = 2;
+}
+
+message ListFetchersReply {
+ repeated GetFetcherReply getFetcherReply = 1;
+}
\ No newline at end of file
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
similarity index 92%
rename from tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
rename to tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
index 17efe060f..332654002 100644
--- a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaClient.java
@@ -29,8 +29,8 @@ import io.grpc.StatusRuntimeException;
import org.apache.tika.CreateFetcherReply;
import org.apache.tika.CreateFetcherRequest;
-import org.apache.tika.FetchReply;
-import org.apache.tika.FetchRequest;
+import org.apache.tika.FetchAndParseReply;
+import org.apache.tika.FetchAndParseRequest;
import org.apache.tika.TikaGrpc;
import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
@@ -58,10 +58,10 @@ public class TikaClient {
logger.info("Create fetcher: " + response.getMessage());
}
- public void fetch(FetchRequest fetchRequest) {
- FetchReply fetchReply;
+ public void fetchAndParse(FetchAndParseRequest fetchAndParseRequest) {
+ FetchAndParseReply fetchReply;
try {
- fetchReply = blockingStub.fetch(fetchRequest);
+ fetchReply = blockingStub.fetchAndParse(fetchAndParseRequest);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
@@ -96,7 +96,7 @@ public class TikaClient {
.putParams("extractFileSystemMetadata", "true")
.build());
- client.fetch(FetchRequest.newBuilder()
+ client.fetchAndParse(FetchAndParseRequest.newBuilder()
.setFetcherName(fetcherId)
.setFetchKey("000164.pdf")
.build());
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
new file mode 100644
index 000000000..b657fad9c
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaGrpcServerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.TikaGrpc;
+
+@RunWith(JUnit4.class)
+public class TikaGrpcServerTest {
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ @Test
+ public void greeterImpl_replyMessage() throws Exception {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
+ .addService(new TikaGrpcServerImpl("tika-config.xml")).build().start());
+
+ TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(
+ // Create a client channel and register for automatic graceful shutdown.
+ grpcCleanup.register(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+
+ String testName = "test name";
+ CreateFetcherReply reply = blockingStub.createFetcher(
+ CreateFetcherRequest.newBuilder().setName(testName).build());
+
+ assertEquals(testName, reply.getMessage());
+ }
+}
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
deleted file mode 100644
index f100f676f..000000000
--- a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2016 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tika.pipes.grpc;
-
-import static org.junit.Assert.assertEquals;
-
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.testing.GrpcCleanupRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import org.apache.tika.CreateFetcherReply;
-import org.apache.tika.CreateFetcherRequest;
-import org.apache.tika.TikaGrpc;
-import org.apache.tika.pipes.grpc.TikaServer;
-
-@RunWith(JUnit4.class)
-public class TikaServerTest {
- @Rule
- public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
-
- @Test
- public void greeterImpl_replyMessage() throws Exception {
- // Generate a unique in-process server name.
- String serverName = InProcessServerBuilder.generateName();
-
- // Create a server, add service, start, and register for automatic graceful shutdown.
- grpcCleanup.register(InProcessServerBuilder
- .forName(serverName).directExecutor().addService(new TikaServer.TikaServerImpl()).build().start());
-
- TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(
- // Create a client channel and register for automatic graceful shutdown.
- grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
-
-
- String testName = "test name";
- CreateFetcherReply reply =
- blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(testName).build());
-
- assertEquals(testName, reply.getMessage());
- }
-}
diff --git a/tika-pipes/tika-grpc/tika-config.xml b/tika-pipes/tika-grpc/tika-config.xml
index b7f8c535c..19a03c922 100644
--- a/tika-pipes/tika-grpc/tika-config.xml
+++ b/tika-pipes/tika-grpc/tika-config.xml
@@ -27,9 +27,5 @@
</params>
</pipes>
<fetchers>
- <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
- <name>file-system-fetcher-fabd51ef-51c1-447c-818c-96af18b2a893</name>
- <basePath>C:\Users\nicho\Downloads\000</basePath>
- </fetcher>
</fetchers>
</properties>
\ No newline at end of file