You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/02 20:43:10 UTC

[tika] 02/02: TIKA-3440 -- first real draft of OpenSearch integration

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3440
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 52380e4277095590a8b8db709fbaba3b5f8add3d
Author: tallison <ta...@apache.org>
AuthorDate: Fri Jul 2 16:42:42 2021 -0400

    TIKA-3440 -- first real draft of OpenSearch integration
---
 .../pipes/emitter/opensearch/OpenSearchClient.java |  38 +++--
 .../emitter/opensearch/OpenSearchEmitter.java      |  47 +++---
 .../src/test/resources/log4j.properties            |  28 ----
 .../test/resources/tika-config-simple-emitter.xml  |  15 +-
 tika-pipes/tika-pipes-integration-tests/pom.xml    |   6 +
 .../apache/tika/pipes/PipeIntegrationTests.java    |   2 +-
 .../tika/pipes/opensearch/OpenSearchTest.java      |  26 ----
 .../pipes/opensearch/OpenSearchTestClient.java     | 121 +++++++++++++++
 .../pipes/opensearch/TikaPipesOpenSearchTest.java  | 172 +++++++++++++++++++++
 .../resources/opensearch/opensearch-mappings.json  |  16 ++
 .../opensearch-parent-child-mappings.json          |  26 ++++
 .../opensearch/tika-config-opensearch.xml          | 113 ++++++++++++++
 12 files changed, 502 insertions(+), 108 deletions(-)

diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
index 3e60c38..6f98474 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -48,24 +48,30 @@ public class OpenSearchClient {
 
     //this includes the full url and the index, should not end in /
     //e.g. https://localhost:9200/my-index
-    private final String openSearchUrl;
-    private final HttpClient httpClient;
+    protected final String openSearchUrl;
+    protected final HttpClient httpClient;
+    private final OpenSearchEmitter.AttachmentStrategy attachmentStrategy;
 
-    private OpenSearchClient(String openSearchUrl, HttpClient httpClient) {
+    protected OpenSearchClient(String openSearchUrl, HttpClient httpClient,
+                               OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
         this.openSearchUrl = openSearchUrl;
         this.httpClient = httpClient;
+        this.attachmentStrategy = attachmentStrategy;
     }
 
     public void addDocument(String emitKey, List<Metadata> metadataList) throws IOException,
             TikaClientException {
         StringBuilder sb = new StringBuilder();
         int i = 0;
+        String routing = (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) ?
+                emitKey : null;
+
         for (Metadata metadata : metadataList) {
             String id = emitKey;
             if (i > 0) {
                 id += "-" + i;
             }
-            String indexJson = getBulkIndexJson(id, emitKey);
+            String indexJson = getBulkIndexJson(id, routing);
             sb.append(indexJson).append("\n");
             if (i == 0) {
                 sb.append(metadataToJsonContainer(metadata));
@@ -75,9 +81,11 @@ public class OpenSearchClient {
             sb.append("\n");
             i++;
         }
-        //System.out.println(sb.toString());
-        String requestUrl = openSearchUrl + "/bulk?routing=" + URLEncoder
-                .encode(emitKey, StandardCharsets.UTF_8.name());
+        String requestUrl = openSearchUrl + "/_bulk";
+        if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+            requestUrl += "?routing=" + URLEncoder.encode(emitKey, StandardCharsets.UTF_8.name());
+        }
+
         JsonResponse response = postJson(requestUrl, sb.toString());
         if (response.getStatus() != 200) {
             throw new TikaClientException(response.getMsg());
@@ -97,9 +105,11 @@ public class OpenSearchClient {
             jsonGenerator.writeStartObject();
 
             writeMetadata(metadata, jsonGenerator);
-            jsonGenerator.writeStartObject("relation_type");
-            jsonGenerator.writeStringField("name", "embedded");
-            jsonGenerator.writeStringField("parent", emitKey);
+            if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+                jsonGenerator.writeStartObject("relation_type");
+                jsonGenerator.writeStringField("name", "embedded");
+                jsonGenerator.writeStringField("parent", emitKey);
+            }
             //end the relation type object
             jsonGenerator.writeEndObject();
             //end the metadata object
@@ -113,7 +123,9 @@ public class OpenSearchClient {
         try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartObject();
             writeMetadata(metadata, jsonGenerator);
-            jsonGenerator.writeStringField("relation_type", "container");
+            if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD) {
+                jsonGenerator.writeStringField("relation_type", "container");
+            }
             jsonGenerator.writeEndObject();
         }
         return writer.toString();
@@ -140,7 +152,7 @@ public class OpenSearchClient {
         StringWriter writer = new StringWriter();
         try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeStartObject("index");
+            jsonGenerator.writeObjectFieldStart("index");
             jsonGenerator.writeStringField("_id", id);
             if (!StringUtils.isEmpty(routing)) {
                 jsonGenerator.writeStringField("routing", routing);
@@ -152,7 +164,7 @@ public class OpenSearchClient {
         return writer.toString();
     }
 
-    protected JsonResponse postJson(String url, String json) throws IOException {
+    public JsonResponse postJson(String url, String json) throws IOException {
         HttpPost httpRequest = new HttpPost(url);
         ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
         httpRequest.setEntity(entity);
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
index b5b9820..fe878fe 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
@@ -19,17 +19,14 @@ package org.apache.tika.pipes.emitter.opensearch;
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.client.TikaClientException;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -37,18 +34,22 @@ import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
-import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.utils.StringUtils;
 
 
 public class OpenSearchEmitter extends AbstractEmitter implements Initializable {
 
+
+    public enum AttachmentStrategy {
+        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+        //anything else?
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(OpenSearchEmitter.class);
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
-    private String solrCollection;
 
-    private String openSearchUrl;
+    private String openSearchUrl = null;
     private String contentField = "content";
     private String idField = "_id";
     private int commitWithin = 1000;
@@ -66,9 +67,13 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
             LOG.warn("metadataList is null or empty");
             return;
         }
-        openSearchClient.addDocument(emitKey, metadataList);
+        try {
+            openSearchClient.addDocument(emitKey, metadataList);
+        } catch (TikaClientException e) {
+            throw new TikaEmitterException("failed to add document", e);
+        }
     }
-
+/*
     private void addMetadataAsSolrInputDocuments(String emitKey, List<Metadata> metadataList,
                                                  List<SolrInputDocument> docsToUpdate)
             throws IOException, TikaEmitterException {
@@ -167,7 +172,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
                 }
             }
         }
-    }
+    }*/
 
     /**
      * Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is "PARENT_CHILD".
@@ -233,6 +238,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
         this.idField = idField;
     }
 
+    //this is the full url, including the collection, e.g. https://localhost:9200/my-collection
     @Field
     public void setOpenSearchUrl(String openSearchUrl) {
         this.openSearchUrl = openSearchUrl;
@@ -270,30 +276,15 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
             throw new TikaConfigException("Must specify an open search url!");
         } else {
             openSearchClient =
-                    new OpenSearchClient(openSearchUrl, httpClientFactory.build());
+                    new OpenSearchClient(openSearchUrl, httpClientFactory.build(), attachmentStrategy);
         }
     }
 
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
-        mustNotBeEmpty("solrCollection", this.solrCollection);
-        mustNotBeEmpty("urlFieldName", this.idField);
-        if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
-                (this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
-            throw new IllegalArgumentException(
-                    "expected either param solrUrls or param solrZkHosts, but neither was specified");
-        }
-        if (this.solrUrls != null && !this.solrUrls.isEmpty() && this.solrZkHosts != null &&
-                !this.solrZkHosts.isEmpty()) {
-            throw new IllegalArgumentException(
-                    "expected either param solrUrls or param solrZkHosts, but both were specified");
-        }
-    }
-
-    public enum AttachmentStrategy {
-        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
-        //anything else?
+        mustNotBeEmpty("openSearchUrl", this.openSearchUrl);
+        mustNotBeEmpty("idField", this.idField);
     }
 
 }
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties
deleted file mode 100644
index d17a4a1..0000000
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-status=info
-name=PropertiesConfig
-filters=threshold
-filter.threshold.type=ThresholdFilter
-filter.threshold.level=info
-appenders=console
-appender.console.type=Console
-appender.console.name=STDERR
-appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
-rootLogger.level=info
-rootLogger.appenderRefs=stderr
-rootLogger.appenderRef.stderr.ref=STDERR
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
index c52da5e..7959bf6 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
@@ -30,23 +30,14 @@
     </metadataFilter>
   </metadataFilters>
   <emitters>
-    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
+    <emitter class="org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter">
       <params>
-        <name>solr1</name>
-        <url>http://localhost:8983/solr/tika-test</url>
+        <name>opensearch1</name>
+        <url>http://localhost:9200/tika-test</url>
         <attachmentStrategy>concatenate-content</attachmentStrategy>
         <contentField>content</contentField>
         <commitWithin>10</commitWithin>
       </params>
     </emitter>
-    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
-      <params>
-        <name>solr2</name>
-        <url>http://localhost:8983/solr/tika-test</url>
-        <attachmentStrategy>parent-child</attachmentStrategy>
-        <contentField>content</contentField>
-        <commitWithin>10</commitWithin>
-      </params>
-    </emitter>
   </emitters>
 </properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml b/tika-pipes/tika-pipes-integration-tests/pom.xml
index d4db22b..db5805d 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-pipes-integration-tests/pom.xml
@@ -141,6 +141,12 @@
       </exclusions>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-emitter-opensearch</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 7cad1da..8112142 100644
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -50,7 +50,7 @@ import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
-@Ignore("turn these into actual tests")
+@Ignore("turn these into actual tests with mock s3")
 public class PipeIntegrationTests {
 
     private static final Path OUTDIR = Paths.get("");
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java
deleted file mode 100644
index 3d22250..0000000
--- a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.opensearch;
-
-import org.junit.Test;
-
-public class OpenSearchTest {
-    @Test
-    public void testOne() throws Exception {
-
-    }
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTestClient.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTestClient.java
new file mode 100644
index 0000000..297891e
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTestClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.opensearch;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import org.apache.tika.pipes.emitter.opensearch.JsonResponse;
+import org.apache.tika.pipes.emitter.opensearch.OpenSearchClient;
+import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter;
+
+/**
+ * This expands on the OpenSearchClient for testing purposes.
+ * This has more functionality than is needed for sending docs to OpenSearch
+ */
+public class OpenSearchTestClient extends OpenSearchClient {
+
+    protected OpenSearchTestClient(String openSearchUrl, HttpClient httpClient,
+                                   OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
+        super(openSearchUrl, httpClient, attachmentStrategy);
+    }
+
+    protected JsonResponse putJson(String url, String json) throws IOException {
+        HttpPut httpRequest = new HttpPut(url);
+        ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
+        httpRequest.setEntity(entity);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+        //At one point, this was required because of connection already
+        // bound exceptions on windows :(
+        //httpPost.setHeader("Connection", "close");
+
+        //try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(EntityUtils.toByteArray(response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response != null && response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse)response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+
+    protected JsonResponse getJson(String url) throws IOException {
+        HttpGet httpRequest = new HttpGet(url);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+        //At one point, this was required because of connection already
+        // bound exceptions on windows :(
+        //httpPost.setHeader("Connection", "close");
+
+        //try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(EntityUtils.toByteArray(response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response != null && response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse)response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/TikaPipesOpenSearchTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/TikaPipesOpenSearchTest.java
new file mode 100644
index 0000000..a6b6a5e
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/TikaPipesOpenSearchTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.opensearch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.jetbrains.annotations.NotNull;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.utility.DockerImageName;
+
+import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.pipes.PipeIntegrationTests;
+import org.apache.tika.pipes.emitter.opensearch.JsonResponse;
+import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter;
+
+public class TikaPipesOpenSearchTest {
+
+    private static final String collection = "testcol";
+    private static final File testFileFolder = new File("target", "test-files");
+    private final int numDocs = 42;
+    protected GenericContainer<?> openSearch;
+    private String openSearchHost;
+    private int openSearchPort;
+    //this includes the collection, e.g. https://localhost:49213/testcol
+    private String openSearchEndpoint;
+    private OpenSearchTestClient client;
+
+    @Rule
+    public GenericContainer<?> openSearchContainer =
+            new GenericContainer<>(DockerImageName.parse(getOpenSearchImageName()))
+                    .withExposedPorts(9200)
+                    .withEnv("discovery.type", "single-node");
+
+    private String getOpenSearchImageName() {
+        return "opensearchproject/opensearch:1.0.0-rc1";
+    }
+
+    @Before
+    public void setupTest() throws Exception {
+        setupOpenSearch(openSearchContainer);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        FileUtils.deleteDirectory(testFileFolder);
+    }
+
+    @Test
+    public void testFSToOpenSearch() throws Exception {
+        //create the collection with mappings
+        String mappings = IOUtils.toString(TikaPipesOpenSearchTest.class.getResourceAsStream(
+                "/opensearch/opensearch-mappings.json"), StandardCharsets.UTF_8);
+        int status = -1;
+        int tries = 0;
+        JsonResponse response = null;
+        //need to wait a bit sometimes before OpenSearch is up
+        while (status != 200 && tries++ < 3) {
+            response = client.putJson(openSearchEndpoint, mappings);
+            if (status != 200) {
+                Thread.sleep(1000);
+            }
+            status = response.getStatus();
+        }
+        if (status != 200) {
+            throw new IllegalArgumentException("couldn't create index/add mappings");
+        }
+        assertTrue(response.getJson().get("acknowledged").asBoolean());
+        assertEquals("testcol", response.getJson().get("index").asText());
+
+        runPipes(OpenSearchEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+        //refresh to make sure the content is searchable
+        JsonResponse refresh = client.getJson(openSearchEndpoint + "/_refresh");
+
+        String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+                "\"query\": \"initial\" } } } }";
+
+        JsonResponse results = client.postJson(openSearchEndpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+
+        assertEquals(numDocs, results.getJson().get("hits").get("total").get("value").asInt());
+
+    }
+
+    private void runPipes(OpenSearchEmitter.AttachmentStrategy attachmentStrategy) throws Exception {
+
+        File tikaConfigFile = new File("target", "ta-opensearch.xml");
+        File log4jPropFile = new File("target", "tmp-log4j2.xml");
+        try (InputStream is = PipeIntegrationTests.class
+                .getResourceAsStream("/pipes-fork-server-custom-log4j2.xml")) {
+            FileUtils.copyInputStreamToFile(is, log4jPropFile);
+        }
+        String tikaConfigTemplateXml;
+        try (InputStream is = PipeIntegrationTests.class
+                .getResourceAsStream("/opensearch/tika-config-opensearch.xml")) {
+            tikaConfigTemplateXml = IOUtils.toString(is, StandardCharsets.UTF_8);
+        }
+
+        String tikaConfigXml =
+                createTikaConfigXml(tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
+                        attachmentStrategy);
+        FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
+
+        TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
+
+
+    }
+
+    @NotNull
+    private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile, String tikaConfigTemplateXml,
+                                       OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
+        String res =
+                tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
+                        .replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
+                        .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
+                        .replaceAll("\\{PATH_TO_DOCS\\}", testFileFolder.getAbsolutePath());
+
+        res = res.replace("{OPENSEARCH_CONNECTION}", openSearchEndpoint);
+
+        return res;
+
+    }
+
+    private void setupOpenSearch(GenericContainer<?> openSearchContainer) throws Exception {
+        createTestHtmlFiles("initial");
+        this.openSearch = openSearchContainer;
+        openSearchHost = openSearch.getHost();
+        openSearchPort = openSearch.getMappedPort(9200);
+        openSearchEndpoint = "https://" + openSearchHost + ":" + openSearchPort + "/" + collection;
+        HttpClientFactory httpClientFactory = new HttpClientFactory();
+        httpClientFactory.setUserName("admin");
+        httpClientFactory.setPassword("admin");
+        //attachment strategy is not used here...TODO clean this up
+        client = new OpenSearchTestClient(openSearchEndpoint,
+                httpClientFactory.build(), OpenSearchEmitter.AttachmentStrategy.SKIP);
+    }
+
+    private void createTestHtmlFiles(String bodyContent) throws Exception {
+        testFileFolder.mkdirs();
+        for (int i = 0; i < numDocs; ++i) {
+            FileUtils.writeStringToFile(new File(testFileFolder, "test-" + i + ".html"),
+                    "<html><body>" + bodyContent +  "</body></html>", StandardCharsets.UTF_8);
+        }
+    }
+
+
+}
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-mappings.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
new file mode 100644
index 0000000..4b87142
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-mappings.json
@@ -0,0 +1,16 @@
+{
+  "settings": {
+    "number_of_shards": 1
+  },
+  "mappings" : {
+    "dynamic": false,
+    "properties" : {
+      "content" : { "type" : "text"},
+      "length" : { "type" : "long"},
+      "creators" : { "type" : "text"},
+      "title" : { "type" : "text"},
+      "mime" : { "type" : "keyword"},
+      "tika_exception" : { "type" : "text"}
+    }
+  }
+}
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
new file mode 100644
index 0000000..def8438
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/opensearch-parent-child-mappings.json
@@ -0,0 +1,26 @@
+{
+  "settings": {
+    "number_of_shards": 1
+  },
+  "mappings" : {
+    "dynamic": false,
+    "_routing": {
+      "required":true
+    },
+    "properties" : {
+      "content" : { "type" : "text"},
+      "length" : { "type" : "long"},
+      "creators" : { "type" : "text"},
+      "title" : { "type" : "text"},
+      "mime" : { "type" : "keyword"},
+      "tika_exception" : { "type" : "text"},
+      "relation_type":{
+        "type":"join",
+        "eager_global_ordinals":true,
+        "relations":{
+          "container":"embedded"
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
new file mode 100644
index 0000000..ccb0f8f
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <parsers>
+    <parser class="org.apache.tika.parser.DefaultParser">
+      <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+      <parser-exclude class="org.apache.tika.parser.pdf.PDFParser"/>
+      <parser-exclude class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser"/>
+      <parser-exclude class="org.apache.tika.parser.microsoft.OfficeParser"/>
+    </parser>
+    <parser class="org.apache.tika.parser.pdf.PDFParser">
+      <params>
+        <param name="extractActions" type="bool">true</param>
+        <param name="checkExtractAccessPermissions" type="bool">true</param>
+      </params>
+    </parser>
+    <parser class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser">
+      <params>
+        <param name="includeDeletedContent" type="bool">true</param>
+        <param name="includeMoveFromContent" type="bool">true</param>
+        <param name="extractMacros" type="bool">true</param>
+      </params>
+    </parser>
+    <parser class="org.apache.tika.parser.microsoft.OfficeParser">
+      <params>
+        <param name="extractMacros" type="bool">true</param>
+      </params>
+    </parser>
+  </parsers>
+  <metadataFilters>
+    <metadataFilter class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
+      <params>
+        <excludeUnmapped>true</excludeUnmapped>
+        <mappings>
+          <mapping from="X-TIKA:content" to="content"/>
+          <mapping from="Content-Length" to="length"/>
+          <mapping from="dc:creator" to="creators"/>
+          <mapping from="dc:title" to="title"/>
+          <mapping from="Content-Type" to="mime"/>
+          <mapping from="X-TIKA:EXCEPTION:container_exception" to="tika_exception"/>
+        </mappings>
+      </params>
+    </metadataFilter>
+  </metadataFilters>
+  <async>
+    <params>
+      <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+      <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+      <emitWithinMillis>60000</emitWithinMillis>
+      <numEmitters>1</numEmitters>
+      <numClients>1</numClients>
+      <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+        <arg>-XX:+ExitOnOutOfMemoryError</arg>
+        <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+    </params>
+  </async>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <params>
+        <name>fsf</name>
+        <basePath>{PATH_TO_DOCS}</basePath>
+      </params>
+    </fetcher>
+  </fetchers>
+  <emitters>
+    <emitter class="org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter">
+      <params>
+        <name>ose</name>
+        <openSearchUrl>{OPENSEARCH_CONNECTION}</openSearchUrl>
+<!--    TODO: implement this
+        <updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
+        -->
+        <attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
+        <contentField>content</contentField>
+        <commitWithin>10</commitWithin>
+        <idField>_id</idField>
+        <connectionTimeout>10000</connectionTimeout>
+        <socketTimeout>60000</socketTimeout>
+        <userName>admin</userName>
+        <password>admin</password>
+      </params>
+    </emitter>
+  </emitters>
+  <pipesIterator class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+    <params>
+      <basePath>{PATH_TO_DOCS}</basePath>
+      <fetcherName>fsf</fetcherName>
+      <emitterName>ose</emitterName>
+    </params>
+  </pipesIterator>
+</properties>