You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by nd...@apache.org on 2024/02/11 19:34:37 UTC

(tika) branch TIKA-4181-grpc created (now 657067b86)

This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a change to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git


      at 657067b86 TIKA-4181 - grpc server and client

This branch includes the following new commits:

     new 657067b86 TIKA-4181 - grpc server and client

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(tika) 01/01: TIKA-4181 - grpc server and client

Posted by nd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ndipiazza pushed a commit to branch TIKA-4181-grpc
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 657067b86733205feb225543c4cc6d40c143425e
Author: Nicholas DiPiazza <nd...@apache.org>
AuthorDate: Sun Feb 11 13:34:26 2024 -0600

    TIKA-4181 - grpc server and client
---
 .../java/org/apache/tika/pipes/PipesConfig.java    |   7 +
 .../org/apache/tika/pipes/PipesClientTest.java     |  49 +++++
 .../org/apache/tika/pipes/tika-sample-config.xml   |  41 ++++
 tika-pipes/pom.xml                                 |   1 +
 tika-pipes/tika-grpc/README.md                     |  13 ++
 tika-pipes/tika-grpc/pom.xml                       | 187 ++++++++++++++++++
 .../org/apache/tika/pipes/grpc/TikaClient.java     | 112 +++++++++++
 .../org/apache/tika/pipes/grpc/TikaServer.java     | 217 +++++++++++++++++++++
 tika-pipes/tika-grpc/src/main/proto/tika.proto     |  47 +++++
 .../org/apache/tika/pipes/grpc/TikaServerTest.java |  59 ++++++
 tika-pipes/tika-grpc/tika-config.xml               |  35 ++++
 11 files changed, 768 insertions(+)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
index 06783d67c..b0e8649f9 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaConfigException;
 
 public class PipesConfig extends PipesConfigBase {
@@ -46,6 +47,12 @@ public class PipesConfig extends PipesConfigBase {
         return pipesConfig;
     }
 
+    public static PipesConfig load(InputStream tikaConfigInputStream) throws IOException, TikaConfigException {
+        PipesConfig pipesConfig = new PipesConfig();
+        pipesConfig.configure("pipes", tikaConfigInputStream);
+        return pipesConfig;
+    }
+
     private PipesConfig() {
 
     }
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
new file mode 100644
index 000000000..46c475546
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesClientTest.java
@@ -0,0 +1,49 @@
+package org.apache.tika.pipes;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+class PipesClientTest {
+    String fetcherName = "fs";
+    String testPdfFile = "testOverlappingText.pdf";
+
+    private PipesClient pipesClient;
+    @BeforeEach
+    public void init()
+            throws TikaConfigException, IOException, ParserConfigurationException, SAXException {
+        Path tikaConfigPath = Paths.get("src", "test", "resources", "org", "apache", "tika",
+                "pipes", "tika-sample-config.xml");
+        PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath);
+        pipesClient = new PipesClient(pipesConfig);
+    }
+
+    @Test
+    void process() throws IOException, InterruptedException {
+        PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(testPdfFile,
+                new FetchKey(fetcherName,
+                testPdfFile), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+        Assertions.assertNotNull(pipesResult.getEmitData().getMetadataList());
+        Assertions.assertEquals(1, pipesResult.getEmitData().getMetadataList().size());
+        Metadata metadata = pipesResult.getEmitData().getMetadataList().get(0);
+        Assertions.assertEquals("testOverlappingText.pdf", metadata.get("resourceName"));
+    }
+}
\ No newline at end of file
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
new file mode 100644
index 000000000..c936852d9
--- /dev/null
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/tika-sample-config.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<properties>
+  <pipes>
+    <params>
+      <numClients>2</numClients>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+      <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit -->
+    </params>
+  </pipes>
+  <autoDetectParserConfig>
+    <digesterFactory class="org.apache.tika.pipes.async.MockDigesterFactory">
+      <skipContainerDocument>false</skipContainerDocument>
+    </digesterFactory>
+  </autoDetectParserConfig>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>fs</name>
+      <basePath>src/test/resources/test-documents</basePath>
+    </fetcher>
+  </fetchers>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 4ef27a191..61738fcd9 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -36,6 +36,7 @@
     <module>tika-pipes-iterators</module>
     <module>tika-pipes-reporters</module>
     <module>tika-async-cli</module>
+    <module>tika-grpc</module>
   </modules>
   <dependencies>
     <dependency>
diff --git a/tika-pipes/tika-grpc/README.md b/tika-pipes/tika-grpc/README.md
new file mode 100644
index 000000000..7b0d4ccd6
--- /dev/null
+++ b/tika-pipes/tika-grpc/README.md
@@ -0,0 +1,13 @@
+# Tika Pipes GRPC Server
+
+The following is the Tika Pipes GRPC Server.
+
+This server will manage a pool of Tika Pipes clients.
+
+* Tika Pipes Fetcher CRUD operations
+  * Create
+  * Read
+  * Update
+  * Delete
+* Fetch + Parse a given Fetch Item
+
diff --git a/tika-pipes/tika-grpc/pom.xml b/tika-pipes/tika-grpc/pom.xml
new file mode 100644
index 000000000..121baff41
--- /dev/null
+++ b/tika-pipes/tika-grpc/pom.xml
@@ -0,0 +1,187 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>tika-grpc</artifactId>
+  <packaging>jar</packaging>
+  <!-- Feel free to delete the comment at the end of these lines. It is just
+       for safely updating the version in our release process. -->
+  <version>1.60.0</version><!-- CURRENT_GRPC_VERSION -->
+  <name>Apache Tika Pipes GRPC Server</name>
+  <url>https://tika.apache.org/</url>
+
+  <parent>
+    <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <grpc.version>1.60.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
+    <protobuf.version>3.24.0</protobuf.version>
+    <protoc.version>3.24.0</protoc.version>
+    <!-- required for JDK 8 -->
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-bom</artifactId>
+        <version>${grpc.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty-shaded</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-services</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.10.1</version> <!-- prevent downgrade via protobuf-java-util -->
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>32.0.1-jre</version> <!-- prevent downgrade of version in protobuf-java-util -->
+    </dependency>
+    <dependency>
+      <groupId>com.google.j2objc</groupId>
+      <artifactId>j2objc-annotations</artifactId>
+      <version>2.8</version> <!-- prevent downgrade of version in guava -->
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/org.apache.tika/tika-pipes -->
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-async-cli</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-parsers-standard-package</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tomcat</groupId>
+      <artifactId>annotations-api</artifactId>
+      <version>6.0.53</version>
+      <scope>provided</scope> <!-- not needed at runtime -->
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.13.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>3.4.0</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <extensions>
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>1.7.1</version>
+      </extension>
+    </extensions>
+    <plugins>
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <version>0.6.1</version>
+        <configuration>
+          <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
+          <pluginId>grpc-java</pluginId>
+          <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>compile</goal>
+              <goal>compile-custom</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <requireUpperBoundDeps/>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>3.5.0</version>
+        <executions>
+          <execution>
+            <id>test</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>${basedir}target/generated-sources/protobuf/grpc-java</source>
+                <source>${basedir}target/generated-sources/protobuf/java</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
new file mode 100644
index 000000000..17efe060f
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaClient.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.grpc.Channel;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchReply;
+import org.apache.tika.FetchRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+public class TikaClient {
+  private static final Logger logger = Logger.getLogger(TikaClient.class.getName());
+
+  private final TikaGrpc.TikaBlockingStub blockingStub;
+
+  public TikaClient(Channel channel) {
+    // 'channel' here is a Channel, not a ManagedChannel, so it is not this code's responsibility to
+    // shut it down.
+
+    // Passing Channels to code makes code easier to test and makes it easier to reuse Channels.
+    blockingStub = TikaGrpc.newBlockingStub(channel);
+  }
+
+  public void createFetcher(CreateFetcherRequest createFileSystemFetcherRequest) {
+    CreateFetcherReply response;
+    try {
+      response = blockingStub.createFetcher(createFileSystemFetcherRequest);
+    } catch (StatusRuntimeException e) {
+      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+      return;
+    }
+    logger.info("Create fetcher: " + response.getMessage());
+  }
+
+  public void fetch(FetchRequest fetchRequest) {
+    FetchReply fetchReply;
+    try {
+      fetchReply = blockingStub.fetch(fetchRequest);
+    } catch (StatusRuntimeException e) {
+      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+      return;
+    }
+    logger.info("Fetch reply - tika parsed metadata: " + fetchReply.getFieldsMap());
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1) {
+      System.err.println("Expects one command line argument for the base path to use for the crawl.");
+      System.exit(1);
+      return;
+    }
+    String crawlPath = args[0];
+    String target = "localhost:50051";
+    // Create a communication channel to the server, known as a Channel. Channels are thread-safe
+    // and reusable. It is common to create channels at the beginning of your application and reuse
+    // them until the application shuts down.
+    //
+    // For the example we use plaintext insecure credentials to avoid needing TLS certificates. To
+    // use TLS, use TlsChannelCredentials instead.
+    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
+        .build();
+    try {
+      TikaClient client = new TikaClient(channel);
+      String fetcherId = "file-system-fetcher-" + UUID.randomUUID();
+
+      client.createFetcher(CreateFetcherRequest.newBuilder()
+              .setName(fetcherId)
+              .setFetcherClass(FileSystemFetcher.class.getName())
+              .putParams("basePath", crawlPath)
+              .putParams("extractFileSystemMetadata", "true")
+              .build());
+
+      client.fetch(FetchRequest.newBuilder()
+                      .setFetcherName(fetcherId)
+                      .setFetchKey("000164.pdf")
+              .build());
+
+
+    } finally {
+      // ManagedChannels use resources like threads and TCP connections. To prevent leaking these
+      // resources the channel should be shut down when it will no longer be used. If it may be used
+      // again leave it running.
+      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
+    }
+  }
+}
diff --git a/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
new file mode 100644
index 000000000..218f81d2c
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/java/org/apache/tika/pipes/grpc/TikaServer.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import io.grpc.Grpc;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.Server;
+import io.grpc.stub.StreamObserver;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.FetchReply;
+import org.apache.tika.FetchRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesClient;
+import org.apache.tika.pipes.PipesConfig;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.AbstractFetcher;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.fetcher.fs.FileSystemFetcher;
+
+/**
+ * Server that manages startup/shutdown of a server.
+ */
+public class TikaServer {
+    private static final Logger logger = Logger.getLogger(TikaServer.class.getName());
+    private Server server;
+
+    private static String tikaConfigPath;
+
+    private void start() throws Exception {
+        /* The port on which the server should run */
+        int port = 50051;
+        server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
+                .addService(new TikaServerImpl()).build().start();
+        logger.info("Server started, listening on " + port);
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+            System.err.println("*** shutting down gRPC server since JVM is shutting down");
+            try {
+                TikaServer.this.stop();
+            } catch (InterruptedException e) {
+                e.printStackTrace(System.err);
+            }
+            System.err.println("*** server shut down");
+        }));
+    }
+
+    private void stop() throws InterruptedException {
+        if (server != null) {
+            server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Await termination on the main thread since the grpc library uses daemon threads.
+     */
+    private void blockUntilShutdown() throws InterruptedException {
+        if (server != null) {
+            server.awaitTermination();
+        }
+    }
+
+    /**
+     * Main launches the server from the command line.
+     */
+    public static void main(String[] args) throws Exception {
+        tikaConfigPath = args[0];
+        final TikaServer server = new TikaServer();
+        server.start();
+        server.blockUntilShutdown();
+    }
+
+    static class TikaServerImpl extends TikaGrpc.TikaImplBase {
+        Map<String, AbstractFetcher> fetchers = Collections.synchronizedMap(new HashMap<>());
+        PipesConfig pipesConfig = PipesConfig.load(Paths.get("tika-config.xml"));
+        PipesClient pipesClient = new PipesClient(pipesConfig);
+
+        TikaServerImpl() throws TikaConfigException, IOException {
+        }
+
+        private void updateTikaConfig()
+                throws ParserConfigurationException, IOException, SAXException,
+                TransformerException {
+            Document tikaConfigDoc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(tikaConfigPath);
+            Element fetchersElement = (Element) tikaConfigDoc.getElementsByTagName("fetchers").item(0);
+            for (int i = 0; i < fetchersElement.getChildNodes().getLength(); ++i) {
+                fetchersElement.removeChild(fetchersElement.getChildNodes().item(i));
+            }
+            for (Map.Entry<String, AbstractFetcher> fetcherEntry : fetchers.entrySet()) {
+                Element fetcher = tikaConfigDoc.createElement("fetcher");
+                fetcher.setAttribute("class", fetcherEntry.getValue().getClass().getName());
+                if (fetcherEntry.getValue() instanceof FileSystemFetcher) {
+                    FileSystemFetcher fileSystemFetcher = (FileSystemFetcher) fetcherEntry.getValue();
+                    Element fetcherName = tikaConfigDoc.createElement("name");
+                    fetcherName.setTextContent(fileSystemFetcher.getName());
+                    fetcher.appendChild(fetcherName);
+                    Element basePath = tikaConfigDoc.createElement("basePath");
+                    fetcher.appendChild(basePath);
+                    basePath.setTextContent(fileSystemFetcher.getBasePath().toAbsolutePath().toString());
+                }
+                fetchersElement.appendChild(fetcher);
+            }
+            DOMSource source = new DOMSource(tikaConfigDoc);
+            FileWriter writer = new FileWriter(tikaConfigPath, StandardCharsets.UTF_8);
+            StreamResult result = new StreamResult(writer);
+
+            TransformerFactory transformerFactory = TransformerFactory.newInstance();
+            Transformer transformer = transformerFactory.newTransformer();
+            transformer.transform(source, result);
+        }
+
+        @Override
+        public void createFetcher(CreateFetcherRequest request,
+                                  StreamObserver<CreateFetcherReply> responseObserver) {
+            CreateFetcherReply reply =
+                    CreateFetcherReply.newBuilder().setMessage(request.getName()).build();
+            if (FileSystemFetcher.class.getName().equals(request.getFetcherClass())) {
+                FileSystemFetcher fileSystemFetcher = new FileSystemFetcher();
+                fileSystemFetcher.setName(request.getName());
+                fileSystemFetcher.setBasePath(request.getParamsOrDefault("basePath", "."));
+                fileSystemFetcher.setExtractFileSystemMetadata(Boolean.parseBoolean(request.getParamsOrDefault("extractFileSystemMetadata", "false")));
+                Map<String, String> paramsMap = request.getParamsMap();
+                Map<String, Param> tikaParamsMap = new HashMap<>();
+                for (Map.Entry<String, String> entry : paramsMap.entrySet()) {
+                    tikaParamsMap.put(entry.getKey(),
+                            new Param<>(entry.getKey(), entry.getValue()));
+                }
+                try {
+                    fileSystemFetcher.initialize(tikaParamsMap);
+                } catch (TikaConfigException e) {
+                    throw new RuntimeException(e);
+                }
+                fetchers.put(request.getName(), fileSystemFetcher);
+            }
+            try {
+                updateTikaConfig();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            responseObserver.onNext(reply);
+            responseObserver.onCompleted();
+        }
+
+        @Override
+        public void fetch(FetchRequest request, StreamObserver<FetchReply> responseObserver) {
+            AbstractFetcher fetcher = fetchers.get(request.getFetcherName());
+            if (fetcher == null) {
+                throw new RuntimeException("Could not find fetcher with name " + request.getFetcherName());
+            }
+            Metadata tikaMetadata = new Metadata();
+            for (Map.Entry<String, String> entry : request.getMetadataMap().entrySet()) {
+                tikaMetadata.add(entry.getKey(), entry.getValue());
+            }
+            try {
+                PipesResult pipesResult = pipesClient.process(new FetchEmitTuple(request.getFetchKey(),
+                        new FetchKey(fetcher.getName(), request.getFetchKey()), new EmitKey(), FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
+                for (Metadata metadata : pipesResult.getEmitData().getMetadataList()) {
+                    FetchReply.Builder fetchReplyBuilder = FetchReply.newBuilder()
+                            .setFetchKey(request.getFetchKey());
+                    for (String name : metadata.names()) {
+                        String value = metadata.get(name);
+                        if (value != null) {
+                            fetchReplyBuilder.putFields(name, value);
+                        }
+                    }
+                    responseObserver.onNext(fetchReplyBuilder.build());
+                }
+                responseObserver.onCompleted();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+}
diff --git a/tika-pipes/tika-grpc/src/main/proto/tika.proto b/tika-pipes/tika-grpc/src/main/proto/tika.proto
new file mode 100644
index 000000000..f2b350f5e
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/main/proto/tika.proto
@@ -0,0 +1,47 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.tika";
+option java_outer_classname = "TikaProto";
+option objc_class_prefix = "HLW";
+
+package tika;
+
+service Tika {
+  rpc CreateFetcher (CreateFetcherRequest) returns (CreateFetcherReply) {}
+  rpc Fetch (FetchRequest) returns (FetchReply) {}
+}
+
+message CreateFetcherRequest {
+  string name = 1;
+  string fetcherClass = 2;
+  map<string,string> params = 3;
+}
+
+message CreateFetcherReply {
+  string message = 1;
+}
+
+message FetchRequest {
+  string fetcherName = 1;
+  string fetchKey = 2;
+  map<string,string> metadata = 3;
+}
+
+message FetchReply {
+  string fetchKey = 1;
+  map<string,string> fields = 2;
+}
diff --git a/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
new file mode 100644
index 000000000..f100f676f
--- /dev/null
+++ b/tika-pipes/tika-grpc/src/test/java/org/apache/tika/pipes/grpc/TikaServerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2016 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tika.pipes.grpc;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import org.apache.tika.CreateFetcherReply;
+import org.apache.tika.CreateFetcherRequest;
+import org.apache.tika.TikaGrpc;
+import org.apache.tika.pipes.grpc.TikaServer;
+
+@RunWith(JUnit4.class)
+public class TikaServerTest {
+  @Rule
+  public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+  @Test
+  public void greeterImpl_replyMessage() throws Exception {
+    // Generate a unique in-process server name.
+    String serverName = InProcessServerBuilder.generateName();
+
+    // Create a server, add service, start, and register for automatic graceful shutdown.
+    grpcCleanup.register(InProcessServerBuilder
+        .forName(serverName).directExecutor().addService(new TikaServer.TikaServerImpl()).build().start());
+
+    TikaGrpc.TikaBlockingStub blockingStub = TikaGrpc.newBlockingStub(
+        // Create a client channel and register for automatic graceful shutdown.
+        grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()));
+
+
+    String testName = "test name";
+    CreateFetcherReply reply =
+        blockingStub.createFetcher(CreateFetcherRequest.newBuilder().setName(testName).build());
+
+    assertEquals(testName, reply.getMessage());
+  }
+}
diff --git a/tika-pipes/tika-grpc/tika-config.xml b/tika-pipes/tika-grpc/tika-config.xml
new file mode 100644
index 000000000..b7f8c535c
--- /dev/null
+++ b/tika-pipes/tika-grpc/tika-config.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<properties>
+  <pipes>
+    <params>
+      <numClients>2</numClients>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+      <maxForEmitBatchBytes>-1</maxForEmitBatchBytes> <!-- disable emit -->
+    </params>
+  </pipes>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <name>file-system-fetcher-fabd51ef-51c1-447c-818c-96af18b2a893</name>
+      <basePath>C:\Users\nicho\Downloads\000</basePath>
+    </fetcher>
+  </fetchers>
+</properties>
\ No newline at end of file