You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/09/27 19:21:05 UTC

[tika] branch main updated: TIKA-3863 -- add a pipes reporter for OpenSearch

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new d16773efe TIKA-3863 -- add a pipes reporter for OpenSearch
d16773efe is described below

commit d16773efe895af343c1e44c7bdec62f22647af33
Author: tballison <ta...@apache.org>
AuthorDate: Tue Sep 27 15:20:54 2022 -0400

    TIKA-3863 -- add a pipes reporter for OpenSearch
---
 .../pom.xml                                        |  13 ++
 .../pipes/xsearch/tests/TikaPipesXSearchBase.java  |  45 +++-
 .../resources/opensearch/opensearch-mappings.json  |   4 +-
 .../opensearch-parent-child-mappings.json          |   4 +-
 .../opensearch/tika-config-opensearch.xml          |  11 +
 .../src/test/resources/test-documents/fake_oom.xml |  24 +++
 .../src/test/resources/test-documents/npe.xml      |  25 +++
 .../src/test/resources/test-documents/oom.xml      |  24 +++
 tika-pipes/pom.xml                                 |   1 +
 tika-pipes/tika-pipes-reporters/pom.xml            |  41 ++++
 .../tika-pipes-reporter-opensearch/pom.xml         | 120 +++++++++++
 .../pipes/reporters/opensearch/JsonResponse.java   |  60 ++++++
 .../reporters/opensearch/OpenSearchClient.java     | 168 +++++++++++++++
 .../opensearch/OpenSearchPipesReporter.java        | 229 +++++++++++++++++++++
 14 files changed, 760 insertions(+), 9 deletions(-)

diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml
index 40bf61e77..916befd5c 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml
@@ -53,6 +53,19 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-reporter-opensearch</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java
index 1f8378157..7f8321431 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java
@@ -28,6 +28,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -90,7 +92,7 @@ public abstract class TikaPipesXSearchBase {
         sendMappings(endpoint, TEST_INDEX, "opensearch-mappings.json");
 
         runPipes(OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS,
-                OpenSearchEmitter.UpdateStrategy.OVERWRITE,
+                OpenSearchEmitter.UpdateStrategy.UPSERT,
                 HandlerConfig.PARSE_MODE.CONCATENATE, endpoint);
 
         String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
@@ -98,15 +100,37 @@ public abstract class TikaPipesXSearchBase {
 
         JsonResponse results = client.postJson(endpoint + "/_search", query);
         assertEquals(200, results.getStatus());
-        assertEquals(numHtmlDocs + numTestDocs,
+        assertEquals(numHtmlDocs + 1,
                 results.getJson().get("hits").get("total").get("value").asInt());
 
         //now try match all
-        query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} } }";
+        query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} }, " +
+                "\"from\": 0, \"size\": 1000 }";
         results = client.postJson(endpoint + "/_search", query);
         assertEquals(200, results.getStatus());
         assertEquals(numHtmlDocs + numTestDocs,
                 results.getJson().get("hits").get("total").get("value").asInt());
+
+        //now test that the reporter worked
+        Map<String, Integer> statusCounts = new HashMap<>();
+        for (JsonNode n : results.getJson().get("hits").get("hits")) {
+            String status = n.get("_source").get("my_test_parse_status").asText();
+            //this will throw an NPE if the field isn't there
+            //in short, this guarantees that the value is there
+            long parseTimeMs = n.get("_source").get("my_test_parse_time_ms").asLong();
+            Integer cnt = statusCounts.get(status);
+            if (cnt == null) {
+                cnt = 1;
+            } else {
+                cnt++;
+            }
+            statusCounts.put(status, cnt);
+        }
+        //the npe is caught and counted as a "parse success"
+        assertEquals(numHtmlDocs + 1, (int) statusCounts.get("PARSE_SUCCESS"));
+        //the embedded docx is emitted directly
+        assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"));
+        assertEquals(2, (int) statusCounts.get("OOM"));
     }
 
     @Test
@@ -125,7 +149,7 @@ public abstract class TikaPipesXSearchBase {
 
         JsonResponse results = client.postJson(endpoint + "/_search", query);
         assertEquals(200, results.getStatus());
-        assertEquals(numHtmlDocs + numTestDocs,
+        assertEquals(numHtmlDocs + 1,
               results.getJson().get("hits").get("total").get("value").asInt());
 
         //now try match all
@@ -135,7 +159,8 @@ public abstract class TikaPipesXSearchBase {
                 "\"match_all\": {} } }";
         results = client.postJson(endpoint + "/_search", query);
         assertEquals(200, results.getStatus());
-        assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+        assertEquals(numHtmlDocs + 3 + 12, // 3 mock files and...
+                // the .docx file has 11 embedded files, plus itself
                 results.getJson().get("hits").get("total").get("value").asInt());
 
         //now check out one of the embedded files
@@ -189,7 +214,7 @@ public abstract class TikaPipesXSearchBase {
 
         JsonResponse results = client.postJson(endpoint + "/_search", query);
         assertEquals(200, results.getStatus());
-        assertEquals(numHtmlDocs + numTestDocs,
+        assertEquals(numHtmlDocs + 1,
                 results.getJson().get("hits").get("total").get("value").asInt());
 
         //now try match all
@@ -199,7 +224,8 @@ public abstract class TikaPipesXSearchBase {
                 "\"match_all\": {} } }";
         results = client.postJson(endpoint + "/_search", query);
         assertEquals(200, results.getStatus());
-        assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+        assertEquals(numHtmlDocs + 3 + 12, //3 for the mock docs,
+                // and the .docx file has 11 embedded files, plus itself
                 results.getJson().get("hits").get("total").get("value").asInt());
 
         //now check out one of the embedded files
@@ -348,6 +374,11 @@ public abstract class TikaPipesXSearchBase {
                                 Matcher.quoteReplacement(testDocDirectory.toAbsolutePath().toString()))
                         .replace("{PARSE_MODE}", parseMode.name());
 
+        if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+            res = res.replace("{INCLUDE_ROUTING}", "true");
+        } else {
+            res = res.replace("{INCLUDE_ROUTING}", "false");
+        }
         res = res.replace("{OPENSEARCH_CONNECTION}", endpoint);
 
         return res;
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
index d52fc6639..900457d7c 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
@@ -11,7 +11,9 @@
       "title" : { "type" : "text"},
       "mime" : { "type" : "keyword"},
       "tika_exception" : { "type" : "text"},
-      "parent" : { "type" : "text"}
+      "parent" : { "type" : "text"},
+      "my_test_parse_time_ms" : {"type": "long"},
+      "my_test_parse_status": {"type": "keyword"}
     }
   }
 }
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
index f8264b557..4b845fde8 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
@@ -20,7 +20,9 @@
         "relations":{
           "container":"embedded"
         }
-      }
+      },
+      "my_test_parse_time_ms" : {"type": "long"},
+      "my_test_parse_status": {"type": "keyword"}
     }
   }
 }
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
index ee727b1ad..f02f05c92 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
@@ -79,6 +79,17 @@
       </forkedJvmArgs>
       <timeoutMillis>60000</timeoutMillis>
     </params>
+    <pipesReporter class="org.apache.tika.pipes.reporters.opensearch.OpenSearchPipesReporter">
+      <params>
+        <openSearchUrl>{OPENSEARCH_CONNECTION}</openSearchUrl>
+        <keyPrefix>my_test_</keyPrefix>
+        <connectionTimeout>10000</connectionTimeout>
+        <socketTimeout>60000</socketTimeout>
+        <includeRouting>{INCLUDE_ROUTING}</includeRouting>
+        <userName>admin</userName>
+        <password>admin</password>
+      </params>
+    </pipesReporter>
   </async>
   <fetchers>
     <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/fake_oom.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
new file mode 100644
index 000000000..42aa9a778
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<mock>
+    <metadata action="add" name="author">Nikolai Lobachevsky</metadata>
+    <throw class="java.lang.OutOfMemoryError">oom message</throw>
+</mock>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/npe.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/npe.xml
new file mode 100644
index 000000000..93599dbc4
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/npe.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<mock>
+  <metadata action="add" name="dc:creator">embeddedAuthor</metadata>
+  <write element="p">some_embedded_content</write>;
+  <throw class="java.lang.NullPointerException">another null pointer exception</throw>;
+</mock>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/oom.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/oom.xml
new file mode 100644
index 000000000..3ee835e68
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/oom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+<mock>
+  <metadata action="add" name="author">Nikolai Lobachevsky</metadata>
+  <oom/>
+</mock>
\ No newline at end of file
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 3b9096b69..755856926 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -34,6 +34,7 @@
     <module>tika-fetchers</module>
     <module>tika-emitters</module>
     <module>tika-pipes-iterators</module>
+    <module>tika-pipes-reporters</module>
   </modules>
 
   <dependencyManagement>
diff --git a/tika-pipes/tika-pipes-reporters/pom.xml b/tika-pipes/tika-pipes-reporters/pom.xml
new file mode 100644
index 000000000..0710efbeb
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-pipes-reporters</artifactId>
+  <name>Apache Tika Pipes Reporters</name>
+  <url>https://tika.apache.org/</url>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>tika-pipes-reporter-opensearch</module>
+  </modules>
+
+  <scm>
+    <tag>2.2.1-rc2</tag>
+  </scm>
+</project>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/pom.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/pom.xml
new file mode 100644
index 000000000..c8ec35693
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes-reporters</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-pipes-reporter-opensearch</artifactId>
+
+  <name>Apache Tika Pipes Reporter - OpenSearch</name>
+  <url>https://tika.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-httpclient-commons</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              <Automatic-Module-Name>org.apache.tika.pipes.reporters.opensearch</Automatic-Module-Name>
+            </manifestEntries>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${maven.shade.version}</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>
+                false
+              </createDependencyReducedPom>
+              <!-- <filters> -->
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*</exclude>
+                    <exclude>LICENSE.txt</exclude>
+                    <exclude>NOTICE.txt</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/LICENSE</resource>
+                  <file>target/classes/META-INF/LICENSE</file>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/NOTICE</resource>
+                  <file>target/classes/META-INF/NOTICE</file>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/DEPENDENCIES</resource>
+                  <file>target/classes/META-INF/DEPENDENCIES</file>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <scm>
+    <tag>2.2.1-rc2</tag>
+  </scm>
+</project>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/JsonResponse.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/JsonResponse.java
new file mode 100644
index 000000000..761e93b7f
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/JsonResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.opensearch;
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class JsonResponse {
+
+    private final int status;
+    private final String msg;
+    private final JsonNode root;
+
+    public JsonResponse(int status, JsonNode root) {
+        this.status = status;
+        this.root = root;
+        this.msg = null;
+    }
+
+    public JsonResponse(int status, String msg) {
+        this.status = status;
+        this.msg = msg;
+        this.root = null;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public JsonNode getJson() {
+        return root;
+    }
+
+    @Override
+    public String toString() {
+        return "JsonResponse{" +
+                "status=" + status +
+                ", msg='" + msg + '\'' +
+                ", root=" + root +
+                '}';
+    }
+}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchClient.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchClient.java
new file mode 100644
index 000000000..6e4d53073
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchClient.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.opensearch;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.utils.StringUtils;
+
+public class OpenSearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenSearchClient.class);
+
+    //this includes the full url and the index, should not end in /
+    //e.g. https://localhost:9200/my-index
+    protected final String openSearchUrl;
+    protected final HttpClient httpClient;
+
+    protected OpenSearchClient(String openSearchUrl, HttpClient httpClient) {
+        this.openSearchUrl = openSearchUrl;
+        this.httpClient = httpClient;
+    }
+
+    public void emitDocument(String emitKey, String routing, Metadata metadata)
+            throws IOException, TikaClientException {
+
+        StringWriter writer = new StringWriter();
+        //we're choosing bulk request to avoid
+        //having to url encode document id
+        //and frankly this was copy/paste/edit from the emitter
+
+        writeBulkRequest(emitKey, routing, writer);
+        writer.append("\n");
+        writeDoc(metadata, writer);
+        writer.append("\n");
+        emitJson(writer.toString());
+    }
+
+    private void emitJson(String json) throws IOException, TikaClientException {
+        String requestUrl = openSearchUrl + "/_bulk";
+        JsonResponse response = postJson(requestUrl, json);
+        if (response.getStatus() != 200) {
+            throw new TikaClientException(response.getMsg());
+        } else {
+            //if there's a single error, throw the full json.
+            //this has not been thoroughly tested with versions of es < 7
+            JsonNode errorNode = response.getJson().get("errors");
+            if (errorNode.asText().equals("true")) {
+                throw new TikaClientException(response.getJson().toString());
+            }
+        }
+    }
+
+    public JsonResponse postJson(String url, String json) throws IOException {
+        HttpPost httpRequest = new HttpPost(url);
+        StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8);
+        httpRequest.setEntity(entity);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("node:", node);
+                    }
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(EntityUtils.toByteArray(response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response != null && response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse) response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+
+    public void writeDoc(Metadata metadata, StringWriter writer) throws IOException {
+
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeObjectFieldStart("doc");
+            writeMetadata(metadata, jsonGenerator);
+            jsonGenerator.writeEndObject();
+            jsonGenerator.writeBooleanField("doc_as_upsert", true);
+            jsonGenerator.writeEndObject();
+        }
+    }
+
+
+    public void writeBulkRequest(String id, String routing, StringWriter writer) throws IOException {
+
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeObjectFieldStart("update");
+            jsonGenerator.writeStringField("_id", id);
+            if (!StringUtils.isEmpty(routing)) {
+                jsonGenerator.writeStringField("routing", routing);
+            }
+            jsonGenerator.writeNumberField("retry_on_conflict", 3);
+            jsonGenerator.writeEndObject();
+            jsonGenerator.writeEndObject();
+        }
+    }
+
+
+    private static void writeMetadata(Metadata metadata, JsonGenerator jsonGenerator)
+            throws IOException {
+        //writes the metadata without the start { or the end }
+        //to allow for other fields to be added
+        for (String n : metadata.names()) {
+            String[] vals = metadata.getValues(n);
+            if (vals.length == 1) {
+                jsonGenerator.writeStringField(n, vals[0]);
+            } else {
+                jsonGenerator.writeArrayFieldStart(n);
+                for (String v : vals) {
+                    jsonGenerator.writeString(v);
+                }
+                jsonGenerator.writeEndArray();
+            }
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
new file mode 100644
index 000000000..bcbbb1565
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.opensearch;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.ExternalProcess;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.utils.StringUtils;
+
+public class OpenSearchPipesReporter extends PipesReporter implements Initializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenSearchPipesReporter.class);
+
+    public static String DEFAULT_PARSE_TIME_KEY = "parse_time_ms";
+    public static String DEFAULT_PARSE_STATUS_KEY = "parse_status";
+    public static String DEFAULT_EXIT_VALUE_KEY = "exit_value";
+
+    private OpenSearchClient openSearchClient;
+    private String openSearchUrl;
+    private HttpClientFactory httpClientFactory = new HttpClientFactory();
+
+    private Set<String> includeStatus = new HashSet<>();
+
+    private Set<String> excludeStatus = new HashSet<>();
+
+    private String parseTimeKey = DEFAULT_PARSE_TIME_KEY;
+
+    private String parseStatusKey = DEFAULT_PARSE_STATUS_KEY;
+
+    private String exitValueKey = DEFAULT_EXIT_VALUE_KEY;
+
+    private boolean includeRouting = false;
+
+
+    @Override
+    public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+        if (! shouldReport(result)) {
+            return;
+        }
+
+        Metadata metadata = new Metadata();
+        metadata.set(parseStatusKey, result.getStatus().name());
+        metadata.set(parseTimeKey, Long.toString(elapsed));
+        if (result.getEmitData() != null && result.getEmitData().getMetadataList() != null &&
+                result.getEmitData().getMetadataList().size() > 0) {
+            Metadata m = result.getEmitData().getMetadataList().get(0);
+            if (m.get(ExternalProcess.EXIT_VALUE) != null) {
+                metadata.set(exitValueKey, m.get(ExternalProcess.EXIT_VALUE));
+            }
+        }
+        //TODO -- we're not currently doing anything with the message
+        try {
+            if (includeRouting) {
+                openSearchClient.emitDocument(t.getEmitKey().getEmitKey(),
+                        t.getEmitKey().getEmitKey(), metadata);
+            } else {
+                openSearchClient.emitDocument(t.getEmitKey().getEmitKey(),
+                        null, metadata);
+
+            }
+        } catch (IOException | TikaClientException e) {
+            LOG.warn("failed to report status for '" +
+                    t.getId() + "'", e);
+        }
+    }
+
+    private boolean shouldReport(PipesResult result) {
+        if (includeStatus.size() > 0) {
+            if (includeStatus.contains(result.getStatus().name())) {
+                return true;
+            }
+            return false;
+        }
+        if (excludeStatus.size() > 0 && excludeStatus.contains(result.getStatus().name())) {
+            return false;
+        }
+        return true;
+    }
+
+    @Field
+    public void setConnectionTimeout(int connectionTimeout) {
+        httpClientFactory.setConnectTimeout(connectionTimeout);
+    }
+
+    @Field
+    public void setSocketTimeout(int socketTimeout) {
+        httpClientFactory.setSocketTimeout(socketTimeout);
+    }
+
+    //this is the full url, including the collection, e.g. https://localhost:9200/my-collection
+    @Field
+    public void setOpenSearchUrl(String openSearchUrl) {
+        this.openSearchUrl = openSearchUrl;
+    }
+
+    @Field
+    public void setUserName(String userName) {
+        httpClientFactory.setUserName(userName);
+    }
+
+    @Field
+    public void setPassword(String password) {
+        httpClientFactory.setPassword(password);
+    }
+
+    @Field
+    public void setAuthScheme(String authScheme) {
+        httpClientFactory.setAuthScheme(authScheme);
+    }
+
+    @Field
+    public void setProxyHost(String proxyHost) {
+        httpClientFactory.setProxyHost(proxyHost);
+    }
+
+    @Field
+    public void setProxyPort(int proxyPort) {
+        httpClientFactory.setProxyPort(proxyPort);
+    }
+
+    @Field
+    public void setIncludeStatuses(List<String> statusList) {
+        includeStatus.addAll(statusList);
+    }
+
+    @Field
+    public void setExcludeStatuses(List<String> statusList) {
+        excludeStatus.addAll(statusList);
+    }
+
+    @Field
+    public void setIncludeRouting(boolean includeRouting) {
+        this.includeRouting = includeRouting;
+    }
+    /**
+     * This prefixes the keys before sending them to OpenSearch.
+     * For example, "pdfinfo_", would have this reporter sending
+     * "pdfinfo_status" and "pdfinfo_parse_time" to OpenSearch.
+     * @param keyPrefix
+     */
+    @Field
+    public void setKeyPrefix(String keyPrefix) {
+        this.parseStatusKey = keyPrefix + DEFAULT_PARSE_STATUS_KEY;
+        this.parseTimeKey = keyPrefix + DEFAULT_PARSE_TIME_KEY;
+        this.exitValueKey = keyPrefix + DEFAULT_EXIT_VALUE_KEY;
+    }
+
+    @Override
+    public void initialize(Map<String, Param> params) throws TikaConfigException {
+        if (StringUtils.isBlank(openSearchUrl)) {
+            throw new TikaConfigException("Must specify an open search url!");
+        } else {
+            openSearchClient =
+                    new OpenSearchClient(openSearchUrl,
+                            httpClientFactory.build());
+        }
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+        mustNotBeEmpty("openSearchUrl", this.openSearchUrl);
+        for (String status : includeStatus) {
+            if (excludeStatus.contains(status)) {
+                throw new TikaConfigException("Can't have a status in both include and exclude: " +
+                        status);
+            }
+        }
+        Set<String> statuses = new HashSet<>();
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        for (PipesResult.STATUS status : PipesResult.STATUS.values()) {
+            statuses.add(status.name());
+            i++;
+            if (i > 1) {
+                sb.append(", ");
+            }
+            sb.append(status.name());
+        }
+        for (String include : includeStatus) {
+            if (! statuses.contains(include)) {
+                throw new TikaConfigException("I regret I don't recognize '" +
+                        include + "' in the include list. " +
+                        "I recognize: " + sb.toString());
+            }
+        }
+        for (String exclude : excludeStatus) {
+            if (! statuses.contains(exclude)) {
+                throw new TikaConfigException("I regret I don't recognize '" +
+                        exclude + "' in the exclude list. " +
+                        "I recognize: " + sb.toString());
+            }
+        }
+    }
+}