You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/02 20:43:10 UTC
[tika] 02/02: TIKA-3440 -- first real draft of OpenSearch
integration
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-3440
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 52380e4277095590a8b8db709fbaba3b5f8add3d
Author: tallison <ta...@apache.org>
AuthorDate: Fri Jul 2 16:42:42 2021 -0400
TIKA-3440 -- first real draft of OpenSearch integration
---
.../pipes/emitter/opensearch/OpenSearchClient.java | 38 +++--
.../emitter/opensearch/OpenSearchEmitter.java | 47 +++---
.../src/test/resources/log4j.properties | 28 ----
.../test/resources/tika-config-simple-emitter.xml | 15 +-
tika-pipes/tika-pipes-integration-tests/pom.xml | 6 +
.../apache/tika/pipes/PipeIntegrationTests.java | 2 +-
.../tika/pipes/opensearch/OpenSearchTest.java | 26 ----
.../pipes/opensearch/OpenSearchTestClient.java | 121 +++++++++++++++
.../pipes/opensearch/TikaPipesOpenSearchTest.java | 172 +++++++++++++++++++++
.../resources/opensearch/opensearch-mappings.json | 16 ++
.../opensearch-parent-child-mappings.json | 26 ++++
.../opensearch/tika-config-opensearch.xml | 113 ++++++++++++++
12 files changed, 502 insertions(+), 108 deletions(-)
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
index 3e60c38..6f98474 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -48,24 +48,30 @@ public class OpenSearchClient {
//this includes the full url and the index, should not end in /
//e.g. https://localhost:9200/my-index
- private final String openSearchUrl;
- private final HttpClient httpClient;
+ protected final String openSearchUrl;
+ protected final HttpClient httpClient;
+ private final OpenSearchEmitter.AttachmentStrategy attachmentStrategy;
- private OpenSearchClient(String openSearchUrl, HttpClient httpClient) {
+ protected OpenSearchClient(String openSearchUrl, HttpClient httpClient,
+ OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
this.openSearchUrl = openSearchUrl;
this.httpClient = httpClient;
+ this.attachmentStrategy = attachmentStrategy;
}
public void addDocument(String emitKey, List<Metadata> metadataList) throws IOException,
TikaClientException {
StringBuilder sb = new StringBuilder();
int i = 0;
+ String routing = (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) ?
+ emitKey : null;
+
for (Metadata metadata : metadataList) {
String id = emitKey;
if (i > 0) {
id += "-" + i;
}
- String indexJson = getBulkIndexJson(id, emitKey);
+ String indexJson = getBulkIndexJson(id, routing);
sb.append(indexJson).append("\n");
if (i == 0) {
sb.append(metadataToJsonContainer(metadata));
@@ -75,9 +81,11 @@ public class OpenSearchClient {
sb.append("\n");
i++;
}
- //System.out.println(sb.toString());
- String requestUrl = openSearchUrl + "/bulk?routing=" + URLEncoder
- .encode(emitKey, StandardCharsets.UTF_8.name());
+ String requestUrl = openSearchUrl + "/_bulk";
+ if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+ requestUrl += "?routing=" + URLEncoder.encode(emitKey, StandardCharsets.UTF_8.name());
+ }
+
JsonResponse response = postJson(requestUrl, sb.toString());
if (response.getStatus() != 200) {
throw new TikaClientException(response.getMsg());
@@ -97,9 +105,11 @@ public class OpenSearchClient {
jsonGenerator.writeStartObject();
writeMetadata(metadata, jsonGenerator);
- jsonGenerator.writeStartObject("relation_type");
- jsonGenerator.writeStringField("name", "embedded");
- jsonGenerator.writeStringField("parent", emitKey);
+ if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+ jsonGenerator.writeStartObject("relation_type");
+ jsonGenerator.writeStringField("name", "embedded");
+ jsonGenerator.writeStringField("parent", emitKey);
+ }
//end the relation type object
jsonGenerator.writeEndObject();
//end the metadata object
@@ -113,7 +123,9 @@ public class OpenSearchClient {
try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
jsonGenerator.writeStartObject();
writeMetadata(metadata, jsonGenerator);
- jsonGenerator.writeStringField("relation_type", "container");
+ if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+ jsonGenerator.writeStringField("relation_type", "container");
+ }
jsonGenerator.writeEndObject();
}
return writer.toString();
@@ -140,7 +152,7 @@ public class OpenSearchClient {
StringWriter writer = new StringWriter();
try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
jsonGenerator.writeStartObject();
- jsonGenerator.writeStartObject("index");
+ jsonGenerator.writeObjectFieldStart("index");
jsonGenerator.writeStringField("_id", id);
if (!StringUtils.isEmpty(routing)) {
jsonGenerator.writeStringField("routing", routing);
@@ -152,7 +164,7 @@ public class OpenSearchClient {
return writer.toString();
}
- protected JsonResponse postJson(String url, String json) throws IOException {
+ public JsonResponse postJson(String url, String json) throws IOException {
HttpPost httpRequest = new HttpPost(url);
ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
httpRequest.setEntity(entity);
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
index b5b9820..fe878fe 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
@@ -19,17 +19,14 @@ package org.apache.tika.pipes.emitter.opensearch;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.client.TikaClientException;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
@@ -37,18 +34,22 @@ import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.utils.StringUtils;
public class OpenSearchEmitter extends AbstractEmitter implements Initializable {
+
+ public enum AttachmentStrategy {
+ SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+ //anything else?
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchEmitter.class);
private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
- private String solrCollection;
- private String openSearchUrl;
+ private String openSearchUrl = null;
private String contentField = "content";
private String idField = "_id";
private int commitWithin = 1000;
@@ -66,9 +67,13 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
LOG.warn("metadataList is null or empty");
return;
}
- openSearchClient.addDocument(emitKey, metadataList);
+ try {
+ openSearchClient.addDocument(emitKey, metadataList);
+ } catch (TikaClientException e) {
+ throw new TikaEmitterException("failed to add document", e);
+ }
}
-
+/*
private void addMetadataAsSolrInputDocuments(String emitKey, List<Metadata> metadataList,
List<SolrInputDocument> docsToUpdate)
throws IOException, TikaEmitterException {
@@ -167,7 +172,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
}
}
}
- }
+ }*/
/**
* Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is "PARENT_CHILD".
@@ -233,6 +238,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
this.idField = idField;
}
+ //this is the full url, including the collection, e.g. https://localhost:9200/my-collection
@Field
public void setOpenSearchUrl(String openSearchUrl) {
this.openSearchUrl = openSearchUrl;
@@ -270,30 +276,15 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
throw new TikaConfigException("Must specify an open search url!");
} else {
openSearchClient =
- new OpenSearchClient(openSearchUrl, httpClientFactory.build());
+ new OpenSearchClient(openSearchUrl, httpClientFactory.build(), attachmentStrategy);
}
}
@Override
public void checkInitialization(InitializableProblemHandler problemHandler)
throws TikaConfigException {
- mustNotBeEmpty("solrCollection", this.solrCollection);
- mustNotBeEmpty("urlFieldName", this.idField);
- if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
- (this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
- throw new IllegalArgumentException(
- "expected either param solrUrls or param solrZkHosts, but neither was specified");
- }
- if (this.solrUrls != null && !this.solrUrls.isEmpty() && this.solrZkHosts != null &&
- !this.solrZkHosts.isEmpty()) {
- throw new IllegalArgumentException(
- "expected either param solrUrls or param solrZkHosts, but both were specified");
- }
- }
-
- public enum AttachmentStrategy {
- SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
- //anything else?
+ mustNotBeEmpty("openSearchUrl", this.openSearchUrl);
+ mustNotBeEmpty("idField", this.idField);
}
}
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties
deleted file mode 100644
index d17a4a1..0000000
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-status=info
-name=PropertiesConfig
-filters=threshold
-filter.threshold.type=ThresholdFilter
-filter.threshold.level=info
-appenders=console
-appender.console.type=Console
-appender.console.name=STDERR
-appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
-rootLogger.level=info
-rootLogger.appenderRefs=stderr
-rootLogger.appenderRef.stderr.ref=STDERR
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
index c52da5e..7959bf6 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
@@ -30,23 +30,14 @@
</metadataFilter>
</metadataFilters>
<emitters>
- <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
+ <emitter class="org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter">
<params>
- <name>solr1</name>
- <url>http://localhost:8983/solr/tika-test</url>
+ <name>opensearch1</name>
+ <url>http://localhost:9200/tika-test</url>
<attachmentStrategy>concatenate-content</attachmentStrategy>
<contentField>content</contentField>
<commitWithin>10</commitWithin>
</params>
</emitter>
- <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
- <params>
- <name>solr2</name>
- <url>http://localhost:8983/solr/tika-test</url>
- <attachmentStrategy>parent-child</attachmentStrategy>
- <contentField>content</contentField>
- <commitWithin>10</commitWithin>
- </params>
- </emitter>
</emitters>
</properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml b/tika-pipes/tika-pipes-integration-tests/pom.xml
index d4db22b..db5805d 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-pipes-integration-tests/pom.xml
@@ -141,6 +141,12 @@
</exclusions>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-emitter-opensearch</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 7cad1da..8112142 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -50,7 +50,7 @@ import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
-@Ignore("turn these into actual tests")
+@Ignore("turn these into actual tests with mock s3")
public class PipeIntegrationTests {
private static final Path OUTDIR = Paths.get("");
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java
deleted file mode 100644
index 3d22250..0000000
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.opensearch;
-
-import org.junit.Test;
-
-public class OpenSearchTest {
- @Test
- public void testOne() throws Exception {
-
- }
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTestClient.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTestClient.java
new file mode 100644
index 0000000..297891e
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTestClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.opensearch;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import org.apache.tika.pipes.emitter.opensearch.JsonResponse;
+import org.apache.tika.pipes.emitter.opensearch.OpenSearchClient;
+import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter;
+
+/**
+ * This expands on the OpenSearchClient for testing purposes.
+ * This has more functionality than is needed for sending docs to OpenSearch
+ */
+public class OpenSearchTestClient extends OpenSearchClient {
+
+ protected OpenSearchTestClient(String openSearchUrl, HttpClient httpClient,
+ OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
+ super(openSearchUrl, httpClient, attachmentStrategy);
+ }
+
+ protected JsonResponse putJson(String url, String json) throws IOException {
+ HttpPut httpRequest = new HttpPut(url);
+ ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
+ httpRequest.setEntity(entity);
+ httpRequest.setHeader("Accept", "application/json");
+ httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+ //At one point, this was required because of connection already
+ // bound exceptions on windows :(
+ //httpPost.setHeader("Connection", "close");
+
+ //try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+
+ HttpResponse response = null;
+ try {
+ response = httpClient.execute(httpRequest);
+ int status = response.getStatusLine().getStatusCode();
+ if (status == 200) {
+ try (Reader reader = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent(),
+ StandardCharsets.UTF_8))) {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(reader);
+ return new JsonResponse(200, node);
+ }
+ } else {
+ return new JsonResponse(status,
+ new String(EntityUtils.toByteArray(response.getEntity()),
+ StandardCharsets.UTF_8));
+ }
+ } finally {
+ if (response != null && response instanceof CloseableHttpResponse) {
+ ((CloseableHttpResponse)response).close();
+ }
+ httpRequest.releaseConnection();
+ }
+ }
+
+ protected JsonResponse getJson(String url) throws IOException {
+ HttpGet httpRequest = new HttpGet(url);
+ httpRequest.setHeader("Accept", "application/json");
+ httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+ //At one point, this was required because of connection already
+ // bound exceptions on windows :(
+ //httpPost.setHeader("Connection", "close");
+
+ //try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+
+ HttpResponse response = null;
+ try {
+ response = httpClient.execute(httpRequest);
+ int status = response.getStatusLine().getStatusCode();
+ if (status == 200) {
+ try (Reader reader = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent(),
+ StandardCharsets.UTF_8))) {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(reader);
+ return new JsonResponse(200, node);
+ }
+ } else {
+ return new JsonResponse(status,
+ new String(EntityUtils.toByteArray(response.getEntity()),
+ StandardCharsets.UTF_8));
+ }
+ } finally {
+ if (response != null && response instanceof CloseableHttpResponse) {
+ ((CloseableHttpResponse)response).close();
+ }
+ httpRequest.releaseConnection();
+ }
+ }
+}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/TikaPipesOpenSearchTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/TikaPipesOpenSearchTest.java
new file mode 100644
index 0000000..a6b6a5e
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/TikaPipesOpenSearchTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.opensearch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.utility.DockerImageName;
+
+import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.pipes.PipeIntegrationTests;
+import org.apache.tika.pipes.emitter.opensearch.JsonResponse;
+import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter;
+
+public class TikaPipesOpenSearchTest {
+
+ private static final String collection = "testcol";
+ private static final File testFileFolder = new File("target", "test-files");
+ private final int numDocs = 42;
+ protected GenericContainer<?> openSearch;
+ private String openSearchHost;
+ private int openSearchPort;
+ //this includes the collection, e.g. https://localhost:49213/testcol
+ private String openSearchEndpoint;
+ private OpenSearchTestClient client;
+
+ @Rule
+ public GenericContainer<?> openSearchContainer =
+ new GenericContainer<>(DockerImageName.parse(getOpenSearchImageName()))
+ .withExposedPorts(9200)
+ .withEnv("discovery.type", "single-node");
+
+ private String getOpenSearchImageName() {
+ return "opensearchproject/opensearch:1.0.0-rc1";
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ setupOpenSearch(openSearchContainer);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ FileUtils.deleteDirectory(testFileFolder);
+ }
+
+ @Test
+ public void testFSToOpenSearch() throws Exception {
+ //create the collection with mappings
+ String mappings = IOUtils.toString(TikaPipesOpenSearchTest.class.getResourceAsStream(
+ "/opensearch/opensearch-mappings.json"), StandardCharsets.UTF_8);
+ int status = -1;
+ int tries = 0;
+ JsonResponse response = null;
+ //need to wait a bit sometimes before OpenSearch is up
+ while (status != 200 && tries++ < 3) {
+ response = client.putJson(openSearchEndpoint, mappings);
+ if (status != 200) {
+ Thread.sleep(1000);
+ }
+ status = response.getStatus();
+ }
+ if (status != 200) {
+ throw new IllegalArgumentException("couldn't create index/add mappings");
+ }
+ assertTrue(response.getJson().get("acknowledged").asBoolean());
+ assertEquals("testcol", response.getJson().get("index").asText());
+
+ runPipes(OpenSearchEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+ //refresh to make sure the content is searchable
+ JsonResponse refresh = client.getJson(openSearchEndpoint + "/_refresh");
+
+ String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+ "\"query\": \"initial\" } } } }";
+
+ JsonResponse results = client.postJson(openSearchEndpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+
+ assertEquals(numDocs, results.getJson().get("hits").get("total").get("value").asInt());
+
+ }
+
+ private void runPipes(OpenSearchEmitter.AttachmentStrategy attachmentStrategy) throws Exception {
+
+ File tikaConfigFile = new File("target", "ta-opensearch.xml");
+ File log4jPropFile = new File("target", "tmp-log4j2.xml");
+ try (InputStream is = PipeIntegrationTests.class
+ .getResourceAsStream("/pipes-fork-server-custom-log4j2.xml")) {
+ FileUtils.copyInputStreamToFile(is, log4jPropFile);
+ }
+ String tikaConfigTemplateXml;
+ try (InputStream is = PipeIntegrationTests.class
+ .getResourceAsStream("/opensearch/tika-config-opensearch.xml")) {
+ tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
+ }
+
+ String tikaConfigXml =
+ createTikaConfigXml(tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
+ attachmentStrategy);
+ FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
+
+ TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
+
+
+ }
+
+ @NotNull
+ private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile, String tikaConfigTemplateXml,
+ OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
+ String res =
+ tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
+ .replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
+ .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
+ .replaceAll("\\{PATH_TO_DOCS\\}", testFileFolder.getAbsolutePath());
+
+ res = res.replace("{OPENSEARCH_CONNECTION}", openSearchEndpoint);
+
+ return res;
+
+ }
+
+ private void setupOpenSearch(GenericContainer<?> openSearchContainer) throws Exception {
+ createTestHtmlFiles("initial");
+ this.openSearch = openSearchContainer;
+ openSearchHost = openSearch.getHost();
+ openSearchPort = openSearch.getMappedPort(9200);
+ openSearchEndpoint = "https://" + openSearchHost + ":" + openSearchPort + "/" + collection;
+ HttpClientFactory httpClientFactory = new HttpClientFactory();
+ httpClientFactory.setUserName("admin");
+ httpClientFactory.setPassword("admin");
+ //attachment strategy is not used here...TODO clean this up
+ client = new OpenSearchTestClient(openSearchEndpoint,
+ httpClientFactory.build(), OpenSearchEmitter.AttachmentStrategy.SKIP);
+ }
+
+ private void createTestHtmlFiles(String bodyContent) throws Exception {
+ testFileFolder.mkdirs();
+ for (int i = 0; i < numDocs; ++i) {
+ FileUtils.writeStringToFile(new File(testFileFolder, "test-" + i + ".html"),
+ "<html><body>" + bodyContent + "</body></html>", StandardCharsets.UTF_8);
+ }
+ }
+
+
+}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-mappings.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
new file mode 100644
index 0000000..4b87142
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
@@ -0,0 +1,16 @@
+{
+ "settings": {
+ "number_of_shards": 1
+ },
+ "mappings" : {
+ "dynamic": false,
+ "properties" : {
+ "content" : { "type" : "text"},
+ "length" : { "type" : "long"},
+ "creators" : { "type" : "text"},
+ "title" : { "type" : "text"},
+ "mime" : { "type" : "keyword"},
+ "tika_exception" : { "type" : "text"}
+ }
+ }
+}
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
new file mode 100644
index 0000000..def8438
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
@@ -0,0 +1,26 @@
+{
+ "settings": {
+ "number_of_shards": 1
+ },
+ "mappings" : {
+ "dynamic": false,
+ "_routing": {
+ "required":true
+ },
+ "properties" : {
+ "content" : { "type" : "text"},
+ "length" : { "type" : "long"},
+ "creators" : { "type" : "text"},
+ "title" : { "type" : "text"},
+ "mime" : { "type" : "keyword"},
+ "tika_exception" : { "type" : "text"},
+ "relation_type":{
+ "type":"join",
+ "eager_global_ordinals":true,
+ "relations":{
+ "container":"embedded"
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
new file mode 100644
index 0000000..ccb0f8f
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<properties>
+ <parsers>
+ <parser class="org.apache.tika.parser.DefaultParser">
+ <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+ <parser-exclude class="org.apache.tika.parser.pdf.PDFParser"/>
+ <parser-exclude class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser"/>
+ <parser-exclude class="org.apache.tika.parser.microsoft.OfficeParser"/>
+ </parser>
+ <parser class="org.apache.tika.parser.pdf.PDFParser">
+ <params>
+ <param name="extractActions" type="bool">true</param>
+ <param name="checkExtractAccessPermissions" type="bool">true</param>
+ </params>
+ </parser>
+ <parser class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser">
+ <params>
+ <param name="includeDeletedContent" type="bool">true</param>
+ <param name="includeMoveFromContent" type="bool">true</param>
+ <param name="extractMacros" type="bool">true</param>
+ </params>
+ </parser>
+ <parser class="org.apache.tika.parser.microsoft.OfficeParser">
+ <params>
+ <param name="extractMacros" type="bool">true</param>
+ </params>
+ </parser>
+ </parsers>
+ <metadataFilters>
+ <metadataFilter class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
+ <params>
+ <excludeUnmapped>true</excludeUnmapped>
+ <mappings>
+ <mapping from="X-TIKA:content" to="content"/>
+ <mapping from="Content-Length" to="length"/>
+ <mapping from="dc:creator" to="creators"/>
+ <mapping from="dc:title" to="title"/>
+ <mapping from="Content-Type" to="mime"/>
+ <mapping from="X-TIKA:EXCEPTION:container_exception" to="tika_exception"/>
+ </mappings>
+ </params>
+ </metadataFilter>
+ </metadataFilters>
+ <async>
+ <params>
+ <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+ <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+ <emitWithinMillis>60000</emitWithinMillis>
+ <numEmitters>1</numEmitters>
+ <numClients>1</numClients>
+ <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+ <forkedJvmArgs>
+ <arg>-Xmx1g</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ <arg>-XX:+ExitOnOutOfMemoryError</arg>
+ <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ </params>
+ </async>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+ <params>
+ <name>fsf</name>
+ <basePath>{PATH_TO_DOCS}</basePath>
+ </params>
+ </fetcher>
+ </fetchers>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter">
+ <params>
+ <name>ose</name>
+ <openSearchUrl>{OPENSEARCH_CONNECTION}</openSearchUrl>
+<!-- TODO: implement this
+ <updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
+ -->
+ <attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
+ <contentField>content</contentField>
+ <commitWithin>10</commitWithin>
+ <idField>_id</idField>
+ <connectionTimeout>10000</connectionTimeout>
+ <socketTimeout>60000</socketTimeout>
+ <userName>admin</userName>
+ <password>admin</password>
+ </params>
+ </emitter>
+ </emitters>
+ <pipesIterator class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+ <params>
+ <basePath>{PATH_TO_DOCS}</basePath>
+ <fetcherName>fsf</fetcherName>
+ <emitterName>ose</emitterName>
+ </params>
+ </pipesIterator>
+</properties>