You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/02/11 19:34:38 UTC

(tika) 01/01: TIKA-4181 - grpc server and client

This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 657067b86733205feb225543c4cc6d40c143425e
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Sun Feb 11 13:34:26 2024 -0600

    TIKA-4181 - grpc server and client
---
 .../java/org/apache/tika/pipes/PipesConfig.java    |   7 +
 .../org/apache/tika/pipes/PipesClientTest.java     |  49 +++++
 .../org/apache/tika/pipes/tika-sample-config.xml   |  41 ++++
 tika-pipes/pom.xml                                 |   1 +
 tika-pipes/tika-grpc/README.md                     |  13 ++
 tika-pipes/tika-grpc/pom.xml                       | 187 ++++++++++++++++++
 .../org/apache/tika/pipes/grpc/TikaClient.java     | 112 +++++++++++
 .../org/apache/tika/pipes/grpc/TikaServer.java     | 217 +++++++++++++++++++++
 tika-pipes/tika-grpc/src/main/proto/tika.proto     |  47 +++++
 .../org/apache/tika/pipes/grpc/TikaServerTest.java |  59 ++++++
 tika-pipes/tika-grpc/tika-config.xml               |  35 ++++
 11 files changed, 768 insertions(+)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
index 06783d67c..b0e8649f9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaConfigException;
 
 public class PipesConfig extends PipesConfigBase {
@@ -46,6 +47,12 @@ public class PipesConfig extends PipesConfigBase {
         return pipesConfig;
     }
 
+    public static PipesConfig load(InputStream tikaConfigInputStream) throws IOException, TikaConfigException {
+        PipesConfig pipesConfig = new PipesConfig();
+        pipesConfig.configure("pipes", tikaConfigInputStream);
+        return pipesConfig;
+    }
+
     private PipesConfig() {
 
     }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
new file mode 100644
index 000000000..46c475546
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
@@ -0,0 +1,49 @@
+package org.apache.tika.pipes;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+class PipesClientTest {
+    String fetcherName = "fs";
+    String testPdfFile = "testOverlappingText.pdf";
+
+    private PipesClient pipesClient;
+    @BeforeEach
+    public void init()
+            throws TikaConfigException, IOException, ParserConfigurationException, SAXException {
+        Path tikaConfigPath = Paths.get("src", "test", "resources", "org", "apache", "tika",
+                "pipes", "tika-sample-config.xml");
+        PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath);
+        pipesClient = new PipesClient(pipesConfig);
+    }
+
+    @Test
+    void process() throws IOException, InterruptedException {
+        PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(testPdfFile,
+                new FetchKey(fetcherName,
+                testPdfFile), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+        Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
+        Assertions.assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
+        Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
+        Assertions.assertEquals("testOverlappingText.pdf", metadata.get("resourceName"));
+    }
+}
\ No newline at end of file
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
new file mode 100644
index 000000000..c936852d9
--- /dev/null
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<properties>
+  <pipes>
+    <params>
+      <numClients>2</numClients>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+      <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit -->
+    </params>
+  </pipes>
+  <autoDetectParserConfig>
+    <digesterFactory class="org.apache.tika.pipes.async.MockDigesterFactory">
+      <skipContainerDocument>false</skipContainerDocument>
+    </digesterFactory>
+  </autoDetectParserConfig>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>fs</name>
+      <basePath>src/test/resources/test-documents</basePath>
+    </fetcher>
+  </fetchers>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 4ef27a191..61738fcd9 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,6 +36,7 @@
     <module>tika-pipes-iterators</module>
     <module>tika-pipes-reporters</module>
     <module>tika-async-cli</module>
+    <module>tika-grpc</module>
   </modules>
   <dependencies>
     <dependency>
diff --git a/tika-pipes/tika-grpc/README.md b/tika-pipes/tika-grpc/README.md
new file mode 100644
index 000000000..7b0d4ccd6
--- /dev/null
+++ b/tika-pipes/tika-grpc/README.md
@@ -0,0 +1,13 @@
+# Tika Pipes GRPC Server
+
+The following is the Tika Pipes GRPC Server.
+
+This server will manage a pool of Tika Pipes clients.
+
+* Tika Pipes Fetcher CRUD operations
+  * Create
+  * Read
+  * Update
+  * Delete
+* Fetch + Parse a given Fetch Item
+
diff --git a/tika-pipes/tika-grpc/pom.xml b/tika-pipes/tika-grpc/pom.xml
new file mode 100644
index 000000000..121baff41
--- /dev/null
+++ b/tika-pipes/tika-grpc/pom.xml
@@ -0,0 +1,187 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>tika-grpc</artifactId>
+  <packaging>jar</packaging>
+  <!-- Feel free to delete the comment at the end of these lines. It is just
+       for safely updating the version in our release process. -->
+  <version>1.60.0</version><!-- CURRENT_GRPC_VERSION -->
+  <name>Apache Tika Pipes GRPC Server</name>
+  <url>https://tika.apache.org/</url>
+
+  <parent>
+    <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <grpc.version>1.60.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
+    <protobuf.version>3.24.0</protobuf.version>
+    <protoc.version>3.24.0</protoc.version>
+    <!-- required for JDK 8 -->
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-bom</artifactId>
+        <version>${grpc.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty-shaded</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-services</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.10.1</version> <!-- prevent downgrade via protobuf-java-util -->
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>32.0.1-jre</version> <!-- prevent downgrade of version in protobuf-java-util -->
+    </dependency>
+    <dependency>
+      <groupId>com.google.j2objc</groupId>
+      <artifactId>j2objc-annotations</artifactId>
+      <version>2.8</version> <!-- prevent downgrade of version in guava -->
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-pipes -->
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-async-cli</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-parsers-standard-package</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tomcat</groupId>
+      <artifactId>annotations-api</artifactId>
+      <version>6.0.53</version>
+      <scope>provided</scope> <!-- not needed at runtime -->
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.13.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>3.4.0</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <extensions>
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>1.7.1</version>
+      </extension>
+    </extensions>
+    <plugins>
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <version>0.6.1</version>
+        <configuration>
+          <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+          <pluginId>grpc-java</pluginId>
+          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>compile-custom</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <requireUpperBoundDeps/>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>3.5.0</version>
+        <executions>
+          <execution>
+            <id>test</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${basedir}target/generated-sources/protobuf/grpc-java</source>
+                <source>${basedir}target/generated-sources/protobuf/java</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
new file mode 100644
index 000000000..17efe060f
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.grpc.Channel;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchReply;
+import org.apache.tika.FetchRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+public class TikaClient {
+  private static final Logger logger = Logger.getLogger(TikaClient.class.getName());
+
+  private final TikaGrpc.TikaBlockingStub blockingStub;
+
+  public TikaClient(Channel channel) {
+    // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
+    // shut it down.
+
+    // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
+    blockingStub = TikaGrpc.newBlockingStub(channel);
+  }
+
+  public void createFetcher(CreateFetcherRequest createFileSystemFetcherRequest) {
+    CreateFetcherReply response;
+    try {
+      response = blockingStub.createFetcher(createFileSystemFetcherRequest);
+    } catch (StatusRuntimeException e) {
+      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+      return;
+    }
+    logger.info("Create fetcher: " + response.getMessage());
+  }
+
+  public void fetch(FetchRequest fetchRequest) {
+    FetchReply fetchReply;
+    try {
+      fetchReply = blockingStub.fetch(fetchRequest);
+    } catch (StatusRuntimeException e) {
+      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+      return;
+    }
+    logger.info("Fetch reply - tika parsed metadata: " + fetchReply.getFieldsMap());
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1) {
+      System.err.println("Expects one command line argument for the base path to use for the crawl.");
+      System.exit(1);
+      return;
+    }
+    String crawlPath = args[0];
+    String target = "localhost:50051";
+    // Create a communication channel to the server, known as a Channel. Channels are thread-safe
+    // and reusable. It is common to create channels at the beginning of your application and reuse
+    // them until the application shuts down.
+    //
+    // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To
+    // use TLS, use TlsChannelCredentials instead.
+    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
+        .build();
+    try {
+      TikaClient client = new TikaClient(channel);
+      String fetcherId = "file-system-fetcher-" + UUID.randomUUID();
+
+      client.createFetcher(CreateFetcherRequest.newBuilder()
+              .setName(fetcherId)
+              .setFetcherClass(FileSystemFetcher.class.getName())
+              .putParams("basePath", crawlPath)
+              .putParams("extractFileSystemMetadata", "true")
+              .build());
+
+      client.fetch(FetchRequest.newBuilder()
+                      .setFetcherName(fetcherId)
+                      .setFetchKey("000164.pdf")
+              .build());
+
+
+    } finally {
+      // ManagedChannels use resources like threads and TCP connections. To prevent leaking these
+      // resources the channel should be shut down when it will no longer be used. If it may be used
+      // again leave it running.
+      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
+    }
+  }
+}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
new file mode 100644
index 000000000..218f81d2c
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import io.grpc.Grpc;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.Server;
+import io.grpc.stub.StreamObserver;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchReply;
+import org.apache.tika.FetchRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesClient;
+import org.apache.tika.pipes.PipesConfig;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+/**
+ * Server that manages startup/shutdown of a server.
+ */
+public class TikaServer {
+    private static final Logger logger = Logger.getLogger(TikaServer.class.getName());
+    private Server server;
+
+    private static String tikaConfigPath;
+
+    private void start() throws Exception {
+        /* The port on which the server should run */
+        int port = 50051;
+        server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
+                .addService(new TikaServerImpl()).build().start();
+        logger.info("Server started, listening on " + port);
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+            System.err.println("*** shutting down gRPC server since JVM is shutting down");
+            try {
+                TikaServer.this.stop();
+            } catch (InterruptedException e) {
+                e.printStackTrace(System.err);
+            }
+            System.err.println("*** server shut down");
+        }));
+    }
+
+    private void stop() throws InterruptedException {
+        if (server != null) {
+            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Await termination on the main thread since the grpc library uses daemon threads.
+     */
+    private void blockUntilShutdown() throws InterruptedException {
+        if (server != null) {
+            server.awaitTermination();
+        }
+    }
+
+    /**
+     * Main launches the server from the command line.
+     */
+    public static void main(String[] args) throws Exception {
+        tikaConfigPath = args[0];
+        final TikaServer server = new TikaServer();
+        server.start();
+        server.blockUntilShutdown();
+    }
+
+    static class TikaServerImpl extends TikaGrpc.TikaImplBase {
+        Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
+        PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
+        PipesClient pipesClient = new PipesClient(pipesConfig);
+
+        TikaServerImpl() throws TikaConfigException, IOException {
+        }
+
+        private void updateTikaConfig()
+                throws ParserConfigurationException, IOException, SAXException,
+                TransformerException {
+            Document tikaConfigDoc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+            Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
+            for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
+                fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
+            }
+            for (Map.Entry<String, AbstractFetcher> fetcherEntry : fetchers.entrySet()) {
+                Element fetcher = tikaConfigDoc.createElement("fetcher");
+                fetcher.setAttribute("class", fetcherEntry.getValue().getClass().getName());
+                if (fetcherEntry.getValue() instanceof FileSystemFetcher) {
+                    FileSystemFetcher fileSystemFetcher = (FileSystemFetcher) fetcherEntry.getValue();
+                    Element fetcherName = tikaConfigDoc.createElement("name");
+                    fetcherName.setTextContent(fileSystemFetcher.getName());
+                    fetcher.appendChild(fetcherName);
+                    Element basePath = tikaConfigDoc.createElement("basePath");
+                    fetcher.appendChild(basePath);
+                    basePath.setTextContent(fileSystemFetcher.getBasePath().toAbsolutePath().toString());
+                }
+                fetchersElement.appendChild(fetcher);
+            }
+            DOMSource source = new DOMSource(tikaConfigDoc);
+            FileWriter writer = new FileWriter(tikaConfigPath, StandardCharsets.UTF_8);
+            StreamResult result = new StreamResult(writer);
+
+            TransformerFactory transformerFactory = TransformerFactory.newInstance();
+            Transformer transformer = transformerFactory.newTransformer();
+            transformer.transform(source, result);
+        }
+
+        @Override
+        public void createFetcher(CreateFetcherRequest request,
+                                  StreamObserver<CreateFetcherReply> responseObserver) {
+            CreateFetcherReply reply =
+                    CreateFetcherReply.newBuilder().setMessage(request.getName()).build();
+            if (FileSystemFetcher.class.getName().equals(request.getFetcherClass())) {
+                FileSystemFetcher fileSystemFetcher = new FileSystemFetcher();
+                fileSystemFetcher.setName(request.getName());
+                fileSystemFetcher.setBasePath(request.getParamsOrDefault("basePath", "."));
+                fileSystemFetcher.setExtractFileSystemMetadata(Boolean.parseBoolean(request.getParamsOrDefault("extractFileSystemMetadata", "false")));
+                Map<String, String> paramsMap = request.getParamsMap();
+                Map<String, Param> tikaParamsMap = new HashMap<>();
+                for (Map.Entry<String, String> entry : paramsMap.entrySet()) {
+                    tikaParamsMap.put(entry.getKey(),
+                            new Param<>(entry.getKey(), entry.getValue()));
+                }
+                try {
+                    fileSystemFetcher.initialize(tikaParamsMap);
+                } catch (TikaConfigException e) {
+                    throw new RuntimeException(e);
+                }
+                fetchers.put(request.getName(), fileSystemFetcher);
+            }
+            try {
+                updateTikaConfig();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void fetch(FetchRequest request, StreamObserver<FetchReply> responseObserver) {
+            AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+            if (fetcher == null) {
+                throw new RuntimeException("Could not find fetcher with name " + request.getFetcherName());
+            }
+            Metadata tikaMetadata = new Metadata();
+            for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
+                tikaMetadata.add(entry.getKey(), entry.getValue());
+            }
+            try {
+                PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
+                        new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+                for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
+                    FetchReply.Builder fetchReplyBuilder = FetchReply.newBuilder()
+                            .setFetchKey(request.getFetchKey());
+                    for (String name : metadata.names()) {
+                        String value = metadata.get(name);
+                        if (value != null) {
+                            fetchReplyBuilder.putFields(name, value);
+                        }
+                    }
+                    responseObserver.onNext(fetchReplyBuilder.build());
+                }
+                responseObserver.onCompleted();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
new file mode 100644
index 000000000..f2b350f5e
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -0,0 +1,47 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.tika";
+option java_outer_classname = "TikaProto";
+option objc_class_prefix = "HLW";
+
+package tika;
+
+service Tika {
+  rpc CreateFetcher (CreateFetcherRequest) returns (CreateFetcherReply) {}
+  rpc Fetch (FetchRequest) returns (FetchReply) {}
+}
+
+message CreateFetcherRequest {
+  string name = 1;
+  string fetcherClass = 2;
+  map<string,string> params = 3;
+}
+
+message CreateFetcherReply {
+  string message = 1;
+}
+
+message FetchRequest {
+  string fetcherName = 1;
+  string fetchKey = 2;
+  map<string,string> metadata = 3;
+}
+
+message FetchReply {
+  string fetchKey = 1;
+  map<string,string> fields = 2;
+}
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
new file mode 100644
index 000000000..f100f676f
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.grpc.TikaServer;
+
+@RunWith(JUnit4.class)
+public class TikaServerTest {
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+  @Test
+  public void greeterImpl_replyMessage() throws Exception {
+    // Generate a unique in-process server name.
+    String serverName = InProcessServerBuilder.generateName();
+
+    // Create a server, add service, start, and register for automatic graceful shutdown.
+    grpcCleanup.register(InProcessServerBuilder
+        .forName(serverName).directExecutor().addService(new TikaServer.TikaServerImpl()).build().start());
+
+    TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(
+        // Create a client channel and register for automatic graceful shutdown.
+        grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+
+    String testName = "test name";
+    CreateFetcherReply reply =
+        blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(testName).build());
+
+    assertEquals(testName, reply.getMessage());
+  }
+}
diff --git a/tika-pipes/tika-grpc/tika-config.xml b/tika-pipes/tika-grpc/tika-config.xml
new file mode 100644
index 000000000..b7f8c535c
--- /dev/null
+++ b/tika-pipes/tika-grpc/tika-config.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<properties>
+  <pipes>
+    <params>
+      <numClients>2</numClients>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+      <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit -->
+    </params>
+  </pipes>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>file-system-fetcher-fabd51ef-51c1-447c-818c-96af18b2a893</name>
+      <basePath>C:\Users\nicho\Downloads\000</basePath>
+    </fetcher>
+  </fetchers>
+</properties>
\ No newline at end of file