You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/02/11 19:34:38 UTC
(tika) 01/01: TIKA-4181 - grpc server and client
This is an automated email from the ASF dual-hosted git repository.
ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 657067b86733205feb225543c4cc6d40c143425e
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Sun Feb 11 13:34:26 2024 -0600
TIKA-4181 - grpc server and client
---
.../java/org/apache/tika/pipes/PipesConfig.java | 7 +
.../org/apache/tika/pipes/PipesClientTest.java | 49 +++++
.../org/apache/tika/pipes/tika-sample-config.xml | 41 ++++
tika-pipes/pom.xml | 1 +
tika-pipes/tika-grpc/README.md | 13 ++
tika-pipes/tika-grpc/pom.xml | 187 ++++++++++++++++++
.../org/apache/tika/pipes/grpc/TikaClient.java | 112 +++++++++++
.../org/apache/tika/pipes/grpc/TikaServer.java | 217 +++++++++++++++++++++
tika-pipes/tika-grpc/src/main/proto/tika.proto | 47 +++++
.../org/apache/tika/pipes/grpc/TikaServerTest.java | 59 ++++++
tika-pipes/tika-grpc/tika-config.xml | 35 ++++
11 files changed, 768 insertions(+)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
index 06783d67c..b0e8649f9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
public class PipesConfig extends PipesConfigBase {
@@ -46,6 +47,12 @@ public class PipesConfig extends PipesConfigBase {
return pipesConfig;
}
+ public static PipesConfig load(InputStream tikaConfigInputStream) throws IOException, TikaConfigException {
+ PipesConfig pipesConfig = new PipesConfig();
+ pipesConfig.configure("pipes", tikaConfigInputStream);
+ return pipesConfig;
+ }
+
private PipesConfig() {
}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
new file mode 100644
index 000000000..46c475546
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
@@ -0,0 +1,49 @@
+package org.apache.tika.pipes;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+class PipesClientTest {
+ String fetcherName = "fs";
+ String testPdfFile = "testOverlappingText.pdf";
+
+ private PipesClient pipesClient;
+ @BeforeEach
+ public void init()
+ throws TikaConfigException, IOException, ParserConfigurationException, SAXException {
+ Path tikaConfigPath = Paths.get("src", "test", "resources", "org", "apache", "tika",
+ "pipes", "tika-sample-config.xml");
+ PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath);
+ pipesClient = new PipesClient(pipesConfig);
+ }
+
+ @Test
+ void process() throws IOException, InterruptedException {
+ PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(testPdfFile,
+ new FetchKey(fetcherName,
+ testPdfFile), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
+ Assertions.assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
+ Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
+ Assertions.assertEquals("testOverlappingText.pdf", metadata.get("resourceName"));
+ }
+}
\ No newline at end of file
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
new file mode 100644
index 000000000..c936852d9
--- /dev/null
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <pipes>
+ <params>
+ <numClients>2</numClients>
+ <forkedJvmArgs>
+ <arg>-Xmx1g</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit -->
+ </params>
+ </pipes>
+ <autoDetectParserConfig>
+ <digesterFactory class="org.apache.tika.pipes.async.MockDigesterFactory">
+ <skipContainerDocument>false</skipContainerDocument>
+ </digesterFactory>
+ </autoDetectParserConfig>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+ <name>fs</name>
+ <basePath>src/test/resources/test-documents</basePath>
+ </fetcher>
+ </fetchers>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 4ef27a191..61738fcd9 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,6 +36,7 @@
<module>tika-pipes-iterators</module>
<module>tika-pipes-reporters</module>
<module>tika-async-cli</module>
+ <module>tika-grpc</module>
</modules>
<dependencies>
<dependency>
diff --git a/tika-pipes/tika-grpc/README.md b/tika-pipes/tika-grpc/README.md
new file mode 100644
index 000000000..7b0d4ccd6
--- /dev/null
+++ b/tika-pipes/tika-grpc/README.md
@@ -0,0 +1,13 @@
+# Tika Pipes GRPC Server
+
+The following is the Tika Pipes GRPC Server.
+
+This server will manage a pool of Tika Pipes clients.
+
+* Tika Pipes Fetcher CRUD operations
+ * Create
+ * Read
+ * Update
+ * Delete
+* Fetch + Parse a given Fetch Item
+
diff --git a/tika-pipes/tika-grpc/pom.xml b/tika-pipes/tika-grpc/pom.xml
new file mode 100644
index 000000000..121baff41
--- /dev/null
+++ b/tika-pipes/tika-grpc/pom.xml
@@ -0,0 +1,187 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>tika-grpc</artifactId>
+ <packaging>jar</packaging>
+ <!-- Feel free to delete the comment at the end of these lines. It is just
+ for safely updating the version in our release process. -->
+ <version>1.60.0</version><!-- CURRENT_GRPC_VERSION -->
+ <name>Apache Tika Pipes GRPC Server</name>
+ <url>https://tika.apache.org/</url>
+
+ <parent>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-pipes</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <grpc.version>1.60.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
+ <protobuf.version>3.24.0</protobuf.version>
+ <protoc.version>3.24.0</protoc.version>
+ <!-- required for JDK 8 -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-bom</artifactId>
+ <version>${grpc.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-services</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.10.1</version> <!-- prevent downgrade via protobuf-java-util -->
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>32.0.1-jre</version> <!-- prevent downgrade of version in protobuf-java-util -->
+ </dependency>
+ <dependency>
+ <groupId>com.google.j2objc</groupId>
+ <artifactId>j2objc-annotations</artifactId>
+ <version>2.8</version> <!-- prevent downgrade of version in guava -->
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-pipes -->
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-async-cli</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-parsers-standard-package</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>2.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tomcat</groupId>
+ <artifactId>annotations-api</artifactId>
+ <version>6.0.53</version>
+ <scope>provided</scope> <!-- not needed at runtime -->
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>3.4.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.7.1</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.4.1</version>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireUpperBoundDeps/>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.5.0</version>
+ <executions>
+ <execution>
+ <id>test</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${basedir}target/generated-sources/protobuf/grpc-java</source>
+ <source>${basedir}target/generated-sources/protobuf/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
new file mode 100644
index 000000000..17efe060f
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.grpc.Channel;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchReply;
+import org.apache.tika.FetchRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+public class TikaClient {
+ private static final Logger logger = Logger.getLogger(TikaClient.class.getName());
+
+ private final TikaGrpc.TikaBlockingStub blockingStub;
+
+ public TikaClient(Channel channel) {
+ // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
+ // shut it down.
+
+ // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
+ blockingStub = TikaGrpc.newBlockingStub(channel);
+ }
+
+ public void createFetcher(CreateFetcherRequest createFileSystemFetcherRequest) {
+ CreateFetcherReply response;
+ try {
+ response = blockingStub.createFetcher(createFileSystemFetcherRequest);
+ } catch (StatusRuntimeException e) {
+ logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+ return;
+ }
+ logger.info("Create fetcher: " + response.getMessage());
+ }
+
+ public void fetch(FetchRequest fetchRequest) {
+ FetchReply fetchReply;
+ try {
+ fetchReply = blockingStub.fetch(fetchRequest);
+ } catch (StatusRuntimeException e) {
+ logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+ return;
+ }
+ logger.info("Fetch reply - tika parsed metadata: " + fetchReply.getFieldsMap());
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1) {
+ System.err.println("Expects one command line argument for the base path to use for the crawl.");
+ System.exit(1);
+ return;
+ }
+ String crawlPath = args[0];
+ String target = "localhost:50051";
+ // Create a communication channel to the server, known as a Channel. Channels are thread-safe
+ // and reusable. It is common to create channels at the beginning of your application and reuse
+ // them until the application shuts down.
+ //
+ // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To
+ // use TLS, use TlsChannelCredentials instead.
+ ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
+ .build();
+ try {
+ TikaClient client = new TikaClient(channel);
+ String fetcherId = "file-system-fetcher-" + UUID.randomUUID();
+
+ client.createFetcher(CreateFetcherRequest.newBuilder()
+ .setName(fetcherId)
+ .setFetcherClass(FileSystemFetcher.class.getName())
+ .putParams("basePath", crawlPath)
+ .putParams("extractFileSystemMetadata", "true")
+ .build());
+
+ client.fetch(FetchRequest.newBuilder()
+ .setFetcherName(fetcherId)
+ .setFetchKey("000164.pdf")
+ .build());
+
+
+ } finally {
+ // ManagedChannels use resources like threads and TCP connections. To prevent leaking these
+ // resources the channel should be shut down when it will no longer be used. If it may be used
+ // again leave it running.
+ channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
new file mode 100644
index 000000000..218f81d2c
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import io.grpc.Grpc;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.Server;
+import io.grpc.stub.StreamObserver;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchReply;
+import org.apache.tika.FetchRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesClient;
+import org.apache.tika.pipes.PipesConfig;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+/**
+ * Server that manages startup/shutdown of a server.
+ */
+public class TikaServer {
+ private static final Logger logger = Logger.getLogger(TikaServer.class.getName());
+ private Server server;
+
+ private static String tikaConfigPath;
+
+ private void start() throws Exception {
+ /* The port on which the server should run */
+ int port = 50051;
+ server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
+ .addService(new TikaServerImpl()).build().start();
+ logger.info("Server started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+ System.err.println("*** shutting down gRPC server since JVM is shutting down");
+ try {
+ TikaServer.this.stop();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ System.err.println("*** server shut down");
+ }));
+ }
+
+ private void stop() throws InterruptedException {
+ if (server != null) {
+ server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ /**
+ * Main launches the server from the command line.
+ */
+ public static void main(String[] args) throws Exception {
+ tikaConfigPath = args[0];
+ final TikaServer server = new TikaServer();
+ server.start();
+ server.blockUntilShutdown();
+ }
+
+ static class TikaServerImpl extends TikaGrpc.TikaImplBase {
+ Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
+ PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
+ PipesClient pipesClient = new PipesClient(pipesConfig);
+
+ TikaServerImpl() throws TikaConfigException, IOException {
+ }
+
+ private void updateTikaConfig()
+ throws ParserConfigurationException, IOException, SAXException,
+ TransformerException {
+ Document tikaConfigDoc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+ Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
+ for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
+ fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
+ }
+ for (Map.Entry<String, AbstractFetcher> fetcherEntry : fetchers.entrySet()) {
+ Element fetcher = tikaConfigDoc.createElement("fetcher");
+ fetcher.setAttribute("class", fetcherEntry.getValue().getClass().getName());
+ if (fetcherEntry.getValue() instanceof FileSystemFetcher) {
+ FileSystemFetcher fileSystemFetcher = (FileSystemFetcher) fetcherEntry.getValue();
+ Element fetcherName = tikaConfigDoc.createElement("name");
+ fetcherName.setTextContent(fileSystemFetcher.getName());
+ fetcher.appendChild(fetcherName);
+ Element basePath = tikaConfigDoc.createElement("basePath");
+ fetcher.appendChild(basePath);
+ basePath.setTextContent(fileSystemFetcher.getBasePath().toAbsolutePath().toString());
+ }
+ fetchersElement.appendChild(fetcher);
+ }
+ DOMSource source = new DOMSource(tikaConfigDoc);
+ FileWriter writer = new FileWriter(tikaConfigPath, StandardCharsets.UTF_8);
+ StreamResult result = new StreamResult(writer);
+
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ Transformer transformer = transformerFactory.newTransformer();
+ transformer.transform(source, result);
+ }
+
+ @Override
+ public void createFetcher(CreateFetcherRequest request,
+ StreamObserver<CreateFetcherReply> responseObserver) {
+ CreateFetcherReply reply =
+ CreateFetcherReply.newBuilder().setMessage(request.getName()).build();
+ if (FileSystemFetcher.class.getName().equals(request.getFetcherClass())) {
+ FileSystemFetcher fileSystemFetcher = new FileSystemFetcher();
+ fileSystemFetcher.setName(request.getName());
+ fileSystemFetcher.setBasePath(request.getParamsOrDefault("basePath", "."));
+ fileSystemFetcher.setExtractFileSystemMetadata(Boolean.parseBoolean(request.getParamsOrDefault("extractFileSystemMetadata", "false")));
+ Map<String, String> paramsMap = request.getParamsMap();
+ Map<String, Param> tikaParamsMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : paramsMap.entrySet()) {
+ tikaParamsMap.put(entry.getKey(),
+ new Param<>(entry.getKey(), entry.getValue()));
+ }
+ try {
+ fileSystemFetcher.initialize(tikaParamsMap);
+ } catch (TikaConfigException e) {
+ throw new RuntimeException(e);
+ }
+ fetchers.put(request.getName(), fileSystemFetcher);
+ }
+ try {
+ updateTikaConfig();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void fetch(FetchRequest request, StreamObserver<FetchReply> responseObserver) {
+ AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+ if (fetcher == null) {
+ throw new RuntimeException("Could not find fetcher with name " + request.getFetcherName());
+ }
+ Metadata tikaMetadata = new Metadata();
+ for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
+ tikaMetadata.add(entry.getKey(), entry.getValue());
+ }
+ try {
+ PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
+ new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+ for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
+ FetchReply.Builder fetchReplyBuilder = FetchReply.newBuilder()
+ .setFetchKey(request.getFetchKey());
+ for (String name : metadata.names()) {
+ String value = metadata.get(name);
+ if (value != null) {
+ fetchReplyBuilder.putFields(name, value);
+ }
+ }
+ responseObserver.onNext(fetchReplyBuilder.build());
+ }
+ responseObserver.onCompleted();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
new file mode 100644
index 000000000..f2b350f5e
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -0,0 +1,47 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.tika";
+option java_outer_classname = "TikaProto";
+option objc_class_prefix = "HLW";
+
+package tika;
+
+service Tika {
+ rpc CreateFetcher (CreateFetcherRequest) returns (CreateFetcherReply) {}
+ rpc Fetch (FetchRequest) returns (FetchReply) {}
+}
+
+message CreateFetcherRequest {
+ string name = 1;
+ string fetcherClass = 2;
+ map<string,string> params = 3;
+}
+
+message CreateFetcherReply {
+ string message = 1;
+}
+
+message FetchRequest {
+ string fetcherName = 1;
+ string fetchKey = 2;
+ map<string,string> metadata = 3;
+}
+
+message FetchReply {
+ string fetchKey = 1;
+ map<string,string> fields = 2;
+}
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
new file mode 100644
index 000000000..f100f676f
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.grpc.TikaServer;
+
+@RunWith(JUnit4.class)
+public class TikaServerTest {
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ @Test
+ public void greeterImpl_replyMessage() throws Exception {
+ // Generate a unique in-process server name.
+ String serverName = InProcessServerBuilder.generateName();
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder
+ .forName(serverName).directExecutor().addService(new TikaServer.TikaServerImpl()).build().start());
+
+ TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(
+ // Create a client channel and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+
+ String testName = "test name";
+ CreateFetcherReply reply =
+ blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(testName).build());
+
+ assertEquals(testName, reply.getMessage());
+ }
+}
diff --git a/tika-pipes/tika-grpc/tika-config.xml b/tika-pipes/tika-grpc/tika-config.xml
new file mode 100644
index 000000000..b7f8c535c
--- /dev/null
+++ b/tika-pipes/tika-grpc/tika-config.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <pipes>
+ <params>
+ <numClients>2</numClients>
+ <forkedJvmArgs>
+ <arg>-Xmx1g</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit -->
+ </params>
+ </pipes>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+ <name>file-system-fetcher-fabd51ef-51c1-447c-818c-96af18b2a893</name>
+ <basePath>C:\Users\nicho\Downloads\000</basePath>
+ </fetcher>
+ </fetchers>
+</properties>
\ No newline at end of file