You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/09/27 19:21:05 UTC
[tika] branch main updated: TIKA-3863 -- add a pipes reporter for OpenSearch
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new d16773efe TIKA-3863 -- add a pipes reporter for OpenSearch
d16773efe is described below
commit d16773efe895af343c1e44c7bdec62f22647af33
Author: tballison <ta...@apache.org>
AuthorDate: Tue Sep 27 15:20:54 2022 -0400
TIKA-3863 -- add a pipes reporter for OpenSearch
---
.../pom.xml | 13 ++
.../pipes/xsearch/tests/TikaPipesXSearchBase.java | 45 +++-
.../resources/opensearch/opensearch-mappings.json | 4 +-
.../opensearch-parent-child-mappings.json | 4 +-
.../opensearch/tika-config-opensearch.xml | 11 +
.../src/test/resources/test-documents/fake_oom.xml | 24 +++
.../src/test/resources/test-documents/npe.xml | 25 +++
.../src/test/resources/test-documents/oom.xml | 24 +++
tika-pipes/pom.xml | 1 +
tika-pipes/tika-pipes-reporters/pom.xml | 41 ++++
.../tika-pipes-reporter-opensearch/pom.xml | 120 +++++++++++
.../pipes/reporters/opensearch/JsonResponse.java | 60 ++++++
.../reporters/opensearch/OpenSearchClient.java | 168 +++++++++++++++
.../opensearch/OpenSearchPipesReporter.java | 229 +++++++++++++++++++++
14 files changed, 760 insertions(+), 9 deletions(-)
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml
index 40bf61e77..916befd5c 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/pom.xml
@@ -53,6 +53,19 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-pipes-reporter-opensearch</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java
index 1f8378157..7f8321431 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/xsearch/tests/TikaPipesXSearchBase.java
@@ -28,6 +28,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -90,7 +92,7 @@ public abstract class TikaPipesXSearchBase {
sendMappings(endpoint, TEST_INDEX, "opensearch-mappings.json");
runPipes(OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS,
- OpenSearchEmitter.UpdateStrategy.OVERWRITE,
+ OpenSearchEmitter.UpdateStrategy.UPSERT,
HandlerConfig.PARSE_MODE.CONCATENATE, endpoint);
String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
@@ -98,15 +100,37 @@ public abstract class TikaPipesXSearchBase {
JsonResponse results = client.postJson(endpoint + "/_search", query);
assertEquals(200, results.getStatus());
- assertEquals(numHtmlDocs + numTestDocs,
+ assertEquals(numHtmlDocs + 1,
results.getJson().get("hits").get("total").get("value").asInt());
//now try match all
- query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} } }";
+ query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} }, " +
+ "\"from\": 0, \"size\": 1000 }";
results = client.postJson(endpoint + "/_search", query);
assertEquals(200, results.getStatus());
assertEquals(numHtmlDocs + numTestDocs,
results.getJson().get("hits").get("total").get("value").asInt());
+
+ //now test that the reporter worked
+ Map<String, Integer> statusCounts = new HashMap<>();
+ for (JsonNode n : results.getJson().get("hits").get("hits")) {
+ String status = n.get("_source").get("my_test_parse_status").asText();
+ //this will throw an NPE if the field isn't there
+ //in short, this guarantees that the value is there
+ long parseTimeMs = n.get("_source").get("my_test_parse_time_ms").asLong();
+ Integer cnt = statusCounts.get(status);
+ if (cnt == null) {
+ cnt = 1;
+ } else {
+ cnt++;
+ }
+ statusCounts.put(status, cnt);
+ }
+ //the npe is caught and counted as a "parse success"
+ assertEquals(numHtmlDocs + 1, (int) statusCounts.get("PARSE_SUCCESS"));
+ //the embedded docx is emitted directly
+ assertEquals(1, (int) statusCounts.get("EMIT_SUCCESS"));
+ assertEquals(2, (int) statusCounts.get("OOM"));
}
@Test
@@ -125,7 +149,7 @@ public abstract class TikaPipesXSearchBase {
JsonResponse results = client.postJson(endpoint + "/_search", query);
assertEquals(200, results.getStatus());
- assertEquals(numHtmlDocs + numTestDocs,
+ assertEquals(numHtmlDocs + 1,
results.getJson().get("hits").get("total").get("value").asInt());
//now try match all
@@ -135,7 +159,8 @@ public abstract class TikaPipesXSearchBase {
"\"match_all\": {} } }";
results = client.postJson(endpoint + "/_search", query);
assertEquals(200, results.getStatus());
- assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+ assertEquals(numHtmlDocs + 3 + 12, // 3 mock files and...
+ // the .docx file has 11 embedded files, plus itself
results.getJson().get("hits").get("total").get("value").asInt());
//now check out one of the embedded files
@@ -189,7 +214,7 @@ public abstract class TikaPipesXSearchBase {
JsonResponse results = client.postJson(endpoint + "/_search", query);
assertEquals(200, results.getStatus());
- assertEquals(numHtmlDocs + numTestDocs,
+ assertEquals(numHtmlDocs + 1,
results.getJson().get("hits").get("total").get("value").asInt());
//now try match all
@@ -199,7 +224,8 @@ public abstract class TikaPipesXSearchBase {
"\"match_all\": {} } }";
results = client.postJson(endpoint + "/_search", query);
assertEquals(200, results.getStatus());
- assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+ assertEquals(numHtmlDocs + 3 + 12, //3 for the mock docs,
+ // and the .docx file has 11 embedded files, plus itself
results.getJson().get("hits").get("total").get("value").asInt());
//now check out one of the embedded files
@@ -348,6 +374,11 @@ public abstract class TikaPipesXSearchBase {
Matcher.quoteReplacement(testDocDirectory.toAbsolutePath().toString()))
.replace("{PARSE_MODE}", parseMode.name());
+ if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+ res = res.replace("{INCLUDE_ROUTING}", "true");
+ } else {
+ res = res.replace("{INCLUDE_ROUTING}", "false");
+ }
res = res.replace("{OPENSEARCH_CONNECTION}", endpoint);
return res;
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
index d52fc6639..900457d7c 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
@@ -11,7 +11,9 @@
"title" : { "type" : "text"},
"mime" : { "type" : "keyword"},
"tika_exception" : { "type" : "text"},
- "parent" : { "type" : "text"}
+ "parent" : { "type" : "text"},
+ "my_test_parse_time_ms" : {"type": "long"},
+ "my_test_parse_status": {"type": "keyword"}
}
}
}
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
index f8264b557..4b845fde8 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
@@ -20,7 +20,9 @@
"relations":{
"container":"embedded"
}
- }
+ },
+ "my_test_parse_time_ms" : {"type": "long"},
+ "my_test_parse_status": {"type": "keyword"}
}
}
}
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
index ee727b1ad..f02f05c92 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
@@ -79,6 +79,17 @@
</forkedJvmArgs>
<timeoutMillis>60000</timeoutMillis>
</params>
+ <pipesReporter class="org.apache.tika.pipes.reporters.opensearch.OpenSearchPipesReporter">
+ <params>
+ <openSearchUrl>{OPENSEARCH_CONNECTION}</openSearchUrl>
+ <keyPrefix>my_test_</keyPrefix>
+ <connectionTimeout>10000</connectionTimeout>
+ <socketTimeout>60000</socketTimeout>
+ <includeRouting>{INCLUDE_ROUTING}</includeRouting>
+ <userName>admin</userName>
+ <password>admin</password>
+ </params>
+ </pipesReporter>
</async>
<fetchers>
<fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/fake_oom.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
new file mode 100644
index 000000000..42aa9a778
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/fake_oom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<mock>
+ <metadata action="add" name="author">Nikolai Lobachevsky</metadata>
+ <throw class="java.lang.OutOfMemoryError">oom message</throw>
+</mock>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/npe.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/npe.xml
new file mode 100644
index 000000000..93599dbc4
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/npe.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<mock>
+ <metadata action="add" name="dc:creator">embeddedAuthor</metadata>
+ <write element="p">some_embedded_content</write>;
+ <throw class="java.lang.NullPointerException">another null pointer exception</throw>;
+</mock>
\ No newline at end of file
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/oom.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/oom.xml
new file mode 100644
index 000000000..3ee835e68
--- /dev/null
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/test-documents/oom.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<mock>
+ <metadata action="add" name="author">Nikolai Lobachevsky</metadata>
+ <oom/>
+</mock>
\ No newline at end of file
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 3b9096b69..755856926 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -34,6 +34,7 @@
<module>tika-fetchers</module>
<module>tika-emitters</module>
<module>tika-pipes-iterators</module>
+ <module>tika-pipes-reporters</module>
</modules>
<dependencyManagement>
diff --git a/tika-pipes/tika-pipes-reporters/pom.xml b/tika-pipes/tika-pipes-reporters/pom.xml
new file mode 100644
index 000000000..0710efbeb
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-pipes</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tika-pipes-reporters</artifactId>
+ <name>Apache Tika Pipes Reporters</name>
+ <url>https://tika.apache.org/</url>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>tika-pipes-reporter-opensearch</module>
+ </modules>
+
+ <scm>
+ <tag>2.2.1-rc2</tag>
+ </scm>
+</project>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/pom.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/pom.xml
new file mode 100644
index 000000000..c8ec35693
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.tika</groupId>
+ <artifactId>tika-pipes-reporters</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>tika-pipes-reporter-opensearch</artifactId>
+
+ <name>Apache Tika Pipes Reporter - OpenSearch</name>
+ <url>https://tika.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-httpclient-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Automatic-Module-Name>org.apache.tika.pipes.reporters.opensearch</Automatic-Module-Name>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven.shade.version}</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>
+ false
+ </createDependencyReducedPom>
+ <!-- <filters> -->
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*</exclude>
+ <exclude>LICENSE.txt</exclude>
+ <exclude>NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/LICENSE</resource>
+ <file>target/classes/META-INF/LICENSE</file>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/NOTICE</resource>
+ <file>target/classes/META-INF/NOTICE</file>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+ <resource>META-INF/DEPENDENCIES</resource>
+ <file>target/classes/META-INF/DEPENDENCIES</file>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+ <scm>
+ <tag>2.2.1-rc2</tag>
+ </scm>
+</project>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/JsonResponse.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/JsonResponse.java
new file mode 100644
index 000000000..761e93b7f
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/JsonResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.opensearch;
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class JsonResponse {
+
+ private final int status;
+ private final String msg;
+ private final JsonNode root;
+
+ public JsonResponse(int status, JsonNode root) {
+ this.status = status;
+ this.root = root;
+ this.msg = null;
+ }
+
+ public JsonResponse(int status, String msg) {
+ this.status = status;
+ this.msg = msg;
+ this.root = null;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public JsonNode getJson() {
+ return root;
+ }
+
+ @Override
+ public String toString() {
+ return "JsonResponse{" +
+ "status=" + status +
+ ", msg='" + msg + '\'' +
+ ", root=" + root +
+ '}';
+ }
+}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchClient.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchClient.java
new file mode 100644
index 000000000..6e4d53073
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchClient.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.opensearch;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.utils.StringUtils;
+
+public class OpenSearchClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpenSearchClient.class);
+
+ //this includes the full url and the index, should not end in /
+ //e.g. https://localhost:9200/my-index
+ protected final String openSearchUrl;
+ protected final HttpClient httpClient;
+
+ protected OpenSearchClient(String openSearchUrl, HttpClient httpClient) {
+ this.openSearchUrl = openSearchUrl;
+ this.httpClient = httpClient;
+ }
+
+ public void emitDocument(String emitKey, String routing, Metadata metadata)
+ throws IOException, TikaClientException {
+
+ StringWriter writer = new StringWriter();
+ //we're choosing bulk request to avoid
+ //having to url encode document id
+ //and frankly this was copy/paste/edit from the emitter
+
+ writeBulkRequest(emitKey, routing, writer);
+ writer.append("\n");
+ writeDoc(metadata, writer);
+ writer.append("\n");
+ emitJson(writer.toString());
+ }
+
+ private void emitJson(String json) throws IOException, TikaClientException {
+ String requestUrl = openSearchUrl + "/_bulk";
+ JsonResponse response = postJson(requestUrl, json);
+ if (response.getStatus() != 200) {
+ throw new TikaClientException(response.getMsg());
+ } else {
+ //if there's a single error, throw the full json.
+ //this has not been thoroughly tested with versions of es < 7
+ JsonNode errorNode = response.getJson().get("errors");
+ if (errorNode.asText().equals("true")) {
+ throw new TikaClientException(response.getJson().toString());
+ }
+ }
+ }
+
+ public JsonResponse postJson(String url, String json) throws IOException {
+ HttpPost httpRequest = new HttpPost(url);
+ StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8);
+ httpRequest.setEntity(entity);
+ httpRequest.setHeader("Accept", "application/json");
+ httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+
+ HttpResponse response = null;
+ try {
+ response = httpClient.execute(httpRequest);
+ int status = response.getStatusLine().getStatusCode();
+ if (status == 200) {
+ try (Reader reader = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent(),
+ StandardCharsets.UTF_8))) {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(reader);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("node:", node);
+ }
+ return new JsonResponse(200, node);
+ }
+ } else {
+ return new JsonResponse(status,
+ new String(EntityUtils.toByteArray(response.getEntity()),
+ StandardCharsets.UTF_8));
+ }
+ } finally {
+ if (response != null && response instanceof CloseableHttpResponse) {
+ ((CloseableHttpResponse) response).close();
+ }
+ httpRequest.releaseConnection();
+ }
+ }
+
+ public void writeDoc(Metadata metadata, StringWriter writer) throws IOException {
+
+ try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeObjectFieldStart("doc");
+ writeMetadata(metadata, jsonGenerator);
+ jsonGenerator.writeEndObject();
+ jsonGenerator.writeBooleanField("doc_as_upsert", true);
+ jsonGenerator.writeEndObject();
+ }
+ }
+
+
+ public void writeBulkRequest(String id, String routing, StringWriter writer) throws IOException {
+
+ try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeObjectFieldStart("update");
+ jsonGenerator.writeStringField("_id", id);
+ if (!StringUtils.isEmpty(routing)) {
+ jsonGenerator.writeStringField("routing", routing);
+ }
+ jsonGenerator.writeNumberField("retry_on_conflict", 3);
+ jsonGenerator.writeEndObject();
+ jsonGenerator.writeEndObject();
+ }
+ }
+
+
+ private static void writeMetadata(Metadata metadata, JsonGenerator jsonGenerator)
+ throws IOException {
+ //writes the metadata without the start { or the end }
+ //to allow for other fields to be added
+ for (String n : metadata.names()) {
+ String[] vals = metadata.getValues(n);
+ if (vals.length == 1) {
+ jsonGenerator.writeStringField(n, vals[0]);
+ } else {
+ jsonGenerator.writeArrayFieldStart(n);
+ for (String v : vals) {
+ jsonGenerator.writeString(v);
+ }
+ jsonGenerator.writeEndArray();
+ }
+ }
+ }
+}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
new file mode 100644
index 000000000..bcbbb1565
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.opensearch;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.ExternalProcess;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.utils.StringUtils;
+
+public class OpenSearchPipesReporter extends PipesReporter implements Initializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpenSearchPipesReporter.class);
+
+ public static String DEFAULT_PARSE_TIME_KEY = "parse_time_ms";
+ public static String DEFAULT_PARSE_STATUS_KEY = "parse_status";
+ public static String DEFAULT_EXIT_VALUE_KEY = "exit_value";
+
+ private OpenSearchClient openSearchClient;
+ private String openSearchUrl;
+ private HttpClientFactory httpClientFactory = new HttpClientFactory();
+
+ private Set<String> includeStatus = new HashSet<>();
+
+ private Set<String> excludeStatus = new HashSet<>();
+
+ private String parseTimeKey = DEFAULT_PARSE_TIME_KEY;
+
+ private String parseStatusKey = DEFAULT_PARSE_STATUS_KEY;
+
+ private String exitValueKey = DEFAULT_EXIT_VALUE_KEY;
+
+ private boolean includeRouting = false;
+
+
+ @Override
+ public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+ if (! shouldReport(result)) {
+ return;
+ }
+
+ Metadata metadata = new Metadata();
+ metadata.set(parseStatusKey, result.getStatus().name());
+ metadata.set(parseTimeKey, Long.toString(elapsed));
+ if (result.getEmitData() != null && result.getEmitData().getMetadataList() != null &&
+ result.getEmitData().getMetadataList().size() > 0) {
+ Metadata m = result.getEmitData().getMetadataList().get(0);
+ if (m.get(ExternalProcess.EXIT_VALUE) != null) {
+ metadata.set(exitValueKey, m.get(ExternalProcess.EXIT_VALUE));
+ }
+ }
+ //TODO -- we're not currently doing anything with the message
+ try {
+ if (includeRouting) {
+ openSearchClient.emitDocument(t.getEmitKey().getEmitKey(),
+ t.getEmitKey().getEmitKey(), metadata);
+ } else {
+ openSearchClient.emitDocument(t.getEmitKey().getEmitKey(),
+ null, metadata);
+
+ }
+ } catch (IOException | TikaClientException e) {
+ LOG.warn("failed to report status for '" +
+ t.getId() + "'", e);
+ }
+ }
+
+ private boolean shouldReport(PipesResult result) {
+ if (includeStatus.size() > 0) {
+ if (includeStatus.contains(result.getStatus().name())) {
+ return true;
+ }
+ return false;
+ }
+ if (excludeStatus.size() > 0 && excludeStatus.contains(result.getStatus().name())) {
+ return false;
+ }
+ return true;
+ }
+
+ @Field
+ public void setConnectionTimeout(int connectionTimeout) {
+ httpClientFactory.setConnectTimeout(connectionTimeout);
+ }
+
+ @Field
+ public void setSocketTimeout(int socketTimeout) {
+ httpClientFactory.setSocketTimeout(socketTimeout);
+ }
+
+ //this is the full url, including the collection, e.g. https://localhost:9200/my-collection
+ @Field
+ public void setOpenSearchUrl(String openSearchUrl) {
+ this.openSearchUrl = openSearchUrl;
+ }
+
+ @Field
+ public void setUserName(String userName) {
+ httpClientFactory.setUserName(userName);
+ }
+
+ @Field
+ public void setPassword(String password) {
+ httpClientFactory.setPassword(password);
+ }
+
+ @Field
+ public void setAuthScheme(String authScheme) {
+ httpClientFactory.setAuthScheme(authScheme);
+ }
+
+ @Field
+ public void setProxyHost(String proxyHost) {
+ httpClientFactory.setProxyHost(proxyHost);
+ }
+
+ @Field
+ public void setProxyPort(int proxyPort) {
+ httpClientFactory.setProxyPort(proxyPort);
+ }
+
+ @Field
+ public void setIncludeStatuses(List<String> statusList) {
+ includeStatus.addAll(statusList);
+ }
+
+ @Field
+ public void setExcludeStatuses(List<String> statusList) {
+ excludeStatus.addAll(statusList);
+ }
+
+ @Field
+ public void setIncludeRouting(boolean includeRouting) {
+ this.includeRouting = includeRouting;
+ }
+ /**
+ * This prefixes the keys before sending them to OpenSearch.
+ * For example, "pdfinfo_", would have this reporter sending
+ * "pdfinfo_status" and "pdfinfo_parse_time" to OpenSearch.
+ * @param keyPrefix
+ */
+ @Field
+ public void setKeyPrefix(String keyPrefix) {
+ this.parseStatusKey = keyPrefix + DEFAULT_PARSE_STATUS_KEY;
+ this.parseTimeKey = keyPrefix + DEFAULT_PARSE_TIME_KEY;
+ this.exitValueKey = keyPrefix + DEFAULT_EXIT_VALUE_KEY;
+ }
+
+ @Override
+ public void initialize(Map<String, Param> params) throws TikaConfigException {
+ if (StringUtils.isBlank(openSearchUrl)) {
+ throw new TikaConfigException("Must specify an open search url!");
+ } else {
+ openSearchClient =
+ new OpenSearchClient(openSearchUrl,
+ httpClientFactory.build());
+ }
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
+ mustNotBeEmpty("openSearchUrl", this.openSearchUrl);
+ for (String status : includeStatus) {
+ if (excludeStatus.contains(status)) {
+ throw new TikaConfigException("Can't have a status in both include and exclude: " +
+ status);
+ }
+ }
+ Set<String> statuses = new HashSet<>();
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ for (PipesResult.STATUS status : PipesResult.STATUS.values()) {
+ statuses.add(status.name());
+ i++;
+ if (i > 1) {
+ sb.append(", ");
+ }
+ sb.append(status.name());
+ }
+ for (String include : includeStatus) {
+ if (! statuses.contains(include)) {
+ throw new TikaConfigException("I regret I don't recognize '" +
+ include + "' in the include list. " +
+ "I recognize: " + sb.toString());
+ }
+ }
+ for (String exclude : excludeStatus) {
+ if (! statuses.contains(exclude)) {
+ throw new TikaConfigException("I regret I don't recognize '" +
+ exclude + "' in the exclude list. " +
+ "I recognize: " + sb.toString());
+ }
+ }
+ }
+}