You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/06/10 15:30:02 UTC

[tika] branch TIKA-3440 created (now 586a823)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-3440
in repository https://gitbox.apache.org/repos/asf/tika.git.


      at 586a823  TIKA-3440 -- WIP do not merge

This branch includes the following new commits:

     new 586a823  TIKA-3440 -- WIP do not merge

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[tika] 01/01: TIKA-3440 -- WIP do not merge

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-3440
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 586a823d73e42a1c13da2eb4b47f7cf79a4a0dba
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jun 10 11:29:36 2021 -0400

    TIKA-3440 -- WIP do not merge
---
 tika-pipes/tika-emitters/pom.xml                   |   1 +
 .../tika-emitters/tika-emitter-opensearch/pom.xml  | 140 ++++++++++
 .../pipes/emitter/opensearch/JsonResponse.java     |  60 +++++
 .../pipes/emitter/opensearch/OpenSearchClient.java | 194 +++++++++++++
 .../emitter/opensearch/OpenSearchEmitter.java      | 299 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  28 ++
 .../test/resources/tika-config-simple-emitter.xml  |  52 ++++
 .../tika/pipes/opensearch/OpenSearchTest.java      |  26 ++
 8 files changed, 800 insertions(+)

diff --git a/tika-pipes/tika-emitters/pom.xml b/tika-pipes/tika-emitters/pom.xml
index 541db31..543d8e7 100644
--- a/tika-pipes/tika-emitters/pom.xml
+++ b/tika-pipes/tika-emitters/pom.xml
@@ -35,6 +35,7 @@
     <module>tika-emitter-fs</module>
     <module>tika-emitter-s3</module>
     <module>tika-emitter-solr</module>
+    <module>tika-emitter-opensearch</module>
   </modules>
 
 
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/pom.xml b/tika-pipes/tika-emitters/tika-emitter-opensearch/pom.xml
new file mode 100644
index 0000000..eeedad4
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tika-emitters</artifactId>
+    <groupId>org.apache.tika</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-emitter-opensearch</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-httpclient-commons</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tika</groupId>
+      <artifactId>tika-serialization</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-io</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              <Automatic-Module-Name>org.apache.tika.pipes.emitter.opensearch</Automatic-Module-Name>
+            </manifestEntries>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${maven.shade.version}</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>
+                false
+              </createDependencyReducedPom>
+              <!-- <filters> -->
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*</exclude>
+                    <exclude>LICENSE.txt</exclude>
+                    <exclude>NOTICE.txt</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/LICENSE</resource>
+                  <file>target/classes/META-INF/LICENSE</file>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/NOTICE</resource>
+                  <file>target/classes/META-INF/NOTICE</file>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/DEPENDENCIES</resource>
+                  <file>target/classes/META-INF/DEPENDENCIES</file>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+</project>
\ No newline at end of file
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/JsonResponse.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/JsonResponse.java
new file mode 100644
index 0000000..9a5d364
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/JsonResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.opensearch;
+
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class JsonResponse {
+
+    private final int status;
+    private final String msg;
+    private final JsonNode root;
+
+    public JsonResponse(int status, JsonNode root) {
+        this.status = status;
+        this.root = root;
+        this.msg = null;
+    }
+
+    public JsonResponse(int status, String msg) {
+        this.status = status;
+        this.msg = msg;
+        this.root = null;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public JsonNode getJson() {
+        return root;
+    }
+
+    @Override
+    public String toString() {
+        return "JsonResponse{" +
+                "status=" + status +
+                ", msg='" + msg + '\'' +
+                ", root=" + root +
+                '}';
+    }
+}
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
new file mode 100644
index 0000000..3e60c38
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.opensearch;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.TikaClientException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.utils.StringUtils;
+
+public class OpenSearchClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenSearchEmitter.class);
+
+    //this includes the full url and the index, should not end in /
+    //e.g. https://localhost:9200/my-index
+    private final String openSearchUrl;
+    private final HttpClient httpClient;
+
+    private OpenSearchClient(String openSearchUrl, HttpClient httpClient) {
+        this.openSearchUrl = openSearchUrl;
+        this.httpClient = httpClient;
+    }
+
+    public void addDocument(String emitKey, List<Metadata> metadataList) throws IOException,
+            TikaClientException {
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        for (Metadata metadata : metadataList) {
+            String id = emitKey;
+            if (i > 0) {
+                id += "-" + i;
+            }
+            String indexJson = getBulkIndexJson(id, emitKey);
+            sb.append(indexJson).append("\n");
+            if (i == 0) {
+                sb.append(metadataToJsonContainer(metadata));
+            } else {
+                sb.append(metadataToJsonEmbedded(metadata, emitKey));
+            }
+            sb.append("\n");
+            i++;
+        }
+        //System.out.println(sb.toString());
+        String requestUrl = openSearchUrl + "/bulk?routing=" + URLEncoder
+                .encode(emitKey, StandardCharsets.UTF_8.name());
+        JsonResponse response = postJson(requestUrl, sb.toString());
+        if (response.getStatus() != 200) {
+            throw new TikaClientException(response.getMsg());
+        } else {
+            //if there's a single error, throw the full json.
+            //this has not been thoroughly tested with versions of es < 7
+            JsonNode errorNode = response.getJson().get("errors");
+            if (errorNode.asText().equals("true")) {
+                throw new TikaClientException(response.getJson().toString());
+            }
+        }
+    }
+
+    private String metadataToJsonEmbedded(Metadata metadata, String emitKey) throws IOException {
+        StringWriter writer = new StringWriter();
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartObject();
+
+            writeMetadata(metadata, jsonGenerator);
+            jsonGenerator.writeStartObject("relation_type");
+            jsonGenerator.writeStringField("name", "embedded");
+            jsonGenerator.writeStringField("parent", emitKey);
+            //end the relation type object
+            jsonGenerator.writeEndObject();
+            //end the metadata object
+            jsonGenerator.writeEndObject();
+        }
+        return writer.toString();
+    }
+
+    private String metadataToJsonContainer(Metadata metadata) throws IOException {
+        StringWriter writer = new StringWriter();
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartObject();
+            writeMetadata(metadata, jsonGenerator);
+            jsonGenerator.writeStringField("relation_type", "container");
+            jsonGenerator.writeEndObject();
+        }
+        return writer.toString();
+    }
+
+    private void writeMetadata(Metadata metadata, JsonGenerator jsonGenerator) throws IOException {
+        //writes the metadata without the start { or the end }
+        //to allow for other fields to be added
+        for (String n : metadata.names()) {
+            String[] vals = metadata.getValues(n);
+            if (vals.length == 1) {
+                jsonGenerator.writeStringField(n, vals[0]);
+            } else {
+                jsonGenerator.writeStartArray(n);
+                for (String v : vals) {
+                    jsonGenerator.writeString(v);
+                }
+                jsonGenerator.writeEndArray();
+            }
+        }
+    }
+
+    private String getBulkIndexJson(String id, String routing) throws IOException {
+        StringWriter writer = new StringWriter();
+        try (JsonGenerator jsonGenerator = new JsonFactory().createGenerator(writer)) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeStartObject("index");
+            jsonGenerator.writeStringField("_id", id);
+            if (!StringUtils.isEmpty(routing)) {
+                jsonGenerator.writeStringField("routing", routing);
+            }
+
+            jsonGenerator.writeEndObject();
+            jsonGenerator.writeEndObject();
+        }
+        return writer.toString();
+    }
+
+    protected JsonResponse postJson(String url, String json) throws IOException {
+        HttpPost httpRequest = new HttpPost(url);
+        ByteArrayEntity entity = new ByteArrayEntity(json.getBytes(StandardCharsets.UTF_8));
+        httpRequest.setEntity(entity);
+        httpRequest.setHeader("Accept", "application/json");
+        httpRequest.setHeader("Content-type", "application/json; charset=utf-8");
+        //At one point, this was required because of connection already
+        // bound exceptions on windows :(
+        //httpPost.setHeader("Connection", "close");
+
+        //try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+
+        HttpResponse response = null;
+        try {
+            response = httpClient.execute(httpRequest);
+            int status = response.getStatusLine().getStatusCode();
+            if (status == 200) {
+                try (Reader reader = new BufferedReader(
+                        new InputStreamReader(response.getEntity().getContent(),
+                                StandardCharsets.UTF_8))) {
+                    ObjectMapper mapper = new ObjectMapper();
+                    JsonNode node = mapper.readTree(reader);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("node:", node);
+                    }
+                    return new JsonResponse(200, node);
+                }
+            } else {
+                return new JsonResponse(status,
+                        new String(EntityUtils.toByteArray(response.getEntity()),
+                                StandardCharsets.UTF_8));
+            }
+        } finally {
+            if (response != null && response instanceof CloseableHttpResponse) {
+                ((CloseableHttpResponse)response).close();
+            }
+            httpRequest.releaseConnection();
+        }
+    }
+}
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
new file mode 100644
index 0000000..b5b9820
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.emitter.opensearch;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.apache.tika.utils.StringUtils;
+
+
+public class OpenSearchEmitter extends AbstractEmitter implements Initializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OpenSearchEmitter.class);
+    private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
+    private String solrCollection;
+
+    private String openSearchUrl;
+    private String contentField = "content";
+    private String idField = "_id";
+    private int commitWithin = 1000;
+    private OpenSearchClient openSearchClient;
+    private final HttpClientFactory httpClientFactory;
+
+    public OpenSearchEmitter() throws TikaConfigException {
+        httpClientFactory = new HttpClientFactory();
+    }
+
+    @Override
+    public void emit(String emitKey, List<Metadata> metadataList)
+            throws IOException, TikaEmitterException {
+        if (metadataList == null || metadataList.size() == 0) {
+            LOG.warn("metadataList is null or empty");
+            return;
+        }
+        openSearchClient.addDocument(emitKey, metadataList);
+    }
+
+    private void addMetadataAsSolrInputDocuments(String emitKey, List<Metadata> metadataList,
+                                                 List<SolrInputDocument> docsToUpdate)
+            throws IOException, TikaEmitterException {
+        SolrInputDocument solrInputDocument = new SolrInputDocument();
+        solrInputDocument.setField(idField, emitKey);
+        if (updateStrategy == UpdateStrategy.UPDATE_MUST_EXIST) {
+            solrInputDocument.setField("_version_", 1);
+        } else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
+            solrInputDocument.setField("_version_", -1);
+        }
+        if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
+            addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
+        } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
+            //this only handles text for now, not xhtml
+            StringBuilder sb = new StringBuilder();
+            for (Metadata metadata : metadataList) {
+                String content = metadata.get(getContentField());
+                if (content != null) {
+                    sb.append(content).append("\n");
+                }
+            }
+            Metadata parent = metadataList.get(0);
+            parent.set(getContentField(), sb.toString());
+            addMetadataToSolrInputDocument(parent, solrInputDocument, updateStrategy);
+        } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
+            addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
+            for (int i = 1; i < metadataList.size(); i++) {
+                SolrInputDocument childSolrInputDocument = new SolrInputDocument();
+                Metadata m = metadataList.get(i);
+                childSolrInputDocument.setField(idField, UUID.randomUUID().toString());
+                addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
+            }
+        } else {
+            throw new IllegalArgumentException(
+                    "I don't yet support this attachment strategy: " + attachmentStrategy);
+        }
+        docsToUpdate.add(solrInputDocument);
+    }
+
+    @Override
+    public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
+        if (batch == null || batch.size() == 0) {
+            LOG.warn("batch is null or empty");
+            return;
+        }
+        List<SolrInputDocument> docsToUpdate = new ArrayList<>();
+        for (EmitData d : batch) {
+            addMetadataAsSolrInputDocuments(d.getEmitKey().getEmitKey(), d.getMetadataList(),
+                    docsToUpdate);
+        }
+        emitSolrBatch(docsToUpdate);
+    }
+
+    private void emitSolrBatch(List<SolrInputDocument> docsToUpdate)
+            throws IOException, TikaEmitterException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Emitting solr doc batch: {}", docsToUpdate);
+        }
+        if (!docsToUpdate.isEmpty()) {
+            try {
+                UpdateRequest req = new UpdateRequest();
+                req.add(docsToUpdate);
+                req.setCommitWithin(commitWithin);
+                req.setParam("failOnVersionConflicts", "false");
+                req.process(openSearchClient, solrCollection);
+            } catch (Exception e) {
+                throw new TikaEmitterException("Could not add batch to solr", e);
+            }
+        }
+    }
+
+    private void addMetadataToSolrInputDocument(Metadata metadata,
+                                                SolrInputDocument solrInputDocument,
+                                                UpdateStrategy updateStrategy) {
+        for (String n : metadata.names()) {
+            String[] vals = metadata.getValues(n);
+            if (vals.length == 0) {
+                continue;
+            } else if (vals.length == 1) {
+                if (updateStrategy == UpdateStrategy.ADD) {
+                    solrInputDocument.setField(n, vals[0]);
+                } else {
+                    solrInputDocument.setField(n, new HashMap<String, String>() {{
+                            put("set", vals[0]);
+                        }
+                    });
+                }
+            } else if (vals.length > 1) {
+                if (updateStrategy == UpdateStrategy.ADD) {
+                    solrInputDocument.setField(n, vals);
+                } else {
+                    solrInputDocument.setField(n, new HashMap<String, String[]>() {{
+                            put("set", vals);
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+    /**
+     * Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is "PARENT_CHILD".
+     * If set to "SKIP", this will index only the main file and ignore all info
+     * in the attachments.  If set to "CONCATENATE_CONTENT", this will concatenate the
+     * content extracted from the attachments into the main document and
+     * then index the main document with the concatenated content _and_ the
+     * main document's metadata (metadata from attachments will be thrown away).
+     * If set to "PARENT_CHILD", this will index the attachments as children
+     * of the parent document via OpenSearch's parent-child relationship.
+     */
+    @Field
+    public void setAttachmentStrategy(String attachmentStrategy) {
+        this.attachmentStrategy = AttachmentStrategy.valueOf(attachmentStrategy);
+    }
+
+
+    @Field
+    public void setConnectionTimeout(int connectionTimeout) {
+        httpClientFactory.setConnectTimeout(connectionTimeout);
+    }
+
+    @Field
+    public void setSocketTimeout(int socketTimeout) {
+        httpClientFactory.setSocketTimeout(socketTimeout);
+    }
+
+    public String getContentField() {
+        return contentField;
+    }
+
+    /**
+     * This is the field _after_ metadata mappings have been applied
+     * that contains the "content" for each metadata object.
+     * <p>
+     * This is the field that is used if {@link #attachmentStrategy}
+     * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
+     *
+     * @param contentField
+     */
+    @Field
+    public void setContentField(String contentField) {
+        this.contentField = contentField;
+    }
+
+    public int getCommitWithin() {
+        return commitWithin;
+    }
+
+    @Field
+    public void setCommitWithin(int commitWithin) {
+        this.commitWithin = commitWithin;
+    }
+
+    /**
+     * Specify the field in the first Metadata that should be
+     * used as the id field for the document.
+     *
+     * @param idField
+     */
+    @Field
+    public void setIdField(String idField) {
+        this.idField = idField;
+    }
+
+    @Field
+    public void setOpenSearchUrl(String openSearchUrl) {
+        this.openSearchUrl = openSearchUrl;
+    }
+
+    //TODO -- add other httpclient configurations??
+    @Field
+    public void setUserName(String userName) {
+        httpClientFactory.setUserName(userName);
+    }
+
+    @Field
+    public void setPassword(String password) {
+        httpClientFactory.setPassword(password);
+    }
+
+    @Field
+    public void setAuthScheme(String authScheme) {
+        httpClientFactory.setAuthScheme(authScheme);
+    }
+
+    @Field
+    public void setProxyHost(String proxyHost) {
+        httpClientFactory.setProxyHost(proxyHost);
+    }
+
+    @Field
+    public void setProxyPort(int proxyPort) {
+        httpClientFactory.setProxyPort(proxyPort);
+    }
+
+    @Override
+    public void initialize(Map<String, Param> params) throws TikaConfigException {
+        if (StringUtils.isBlank(openSearchUrl)) {
+            throw new TikaConfigException("Must specify an open search url!");
+        } else {
+            openSearchClient =
+                    new OpenSearchClient(openSearchUrl, httpClientFactory.build());
+        }
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+        mustNotBeEmpty("solrCollection", this.solrCollection);
+        mustNotBeEmpty("urlFieldName", this.idField);
+        if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
+                (this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
+            throw new IllegalArgumentException(
+                    "expected either param solrUrls or param solrZkHosts, but neither was specified");
+        }
+        if (this.solrUrls != null && !this.solrUrls.isEmpty() && this.solrZkHosts != null &&
+                !this.solrZkHosts.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "expected either param solrUrls or param solrZkHosts, but both were specified");
+        }
+    }
+
+    public enum AttachmentStrategy {
+        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+        //anything else?
+    }
+
+}
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d17a4a1
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+status=info
+name=PropertiesConfig
+filters=threshold
+filter.threshold.type=ThresholdFilter
+filter.threshold.level=info
+appenders=console
+appender.console.type=Console
+appender.console.name=STDERR
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
+rootLogger.level=info
+rootLogger.appenderRefs=stderr
+rootLogger.appenderRef.stderr.ref=STDERR
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
new file mode 100644
index 0000000..c52da5e
--- /dev/null
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/test/resources/tika-config-simple-emitter.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <metadataFilters>
+    <metadataFilter class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
+      <mappings>
+        <mapping from="X-TIKA:content" to="content"/>
+        <mapping from="X-TIKA:embedded_resource_path" to="embedded_path"/>
+        <mapping from="Content-Length" to="length"/>
+        <mapping from="dc:creator" to="creators"/>
+        <mapping from="dc:title" to="title"/>
+      </mappings>
+    </metadataFilter>
+  </metadataFilters>
+  <emitters>
+    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
+      <params>
+        <name>solr1</name>
+        <url>http://localhost:8983/solr/tika-test</url>
+        <attachmentStrategy>concatenate-content</attachmentStrategy>
+        <contentField>content</contentField>
+        <commitWithin>10</commitWithin>
+      </params>
+    </emitter>
+    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
+      <params>
+        <name>solr2</name>
+        <url>http://localhost:8983/solr/tika-test</url>
+        <attachmentStrategy>parent-child</attachmentStrategy>
+        <contentField>content</contentField>
+        <commitWithin>10</commitWithin>
+      </params>
+    </emitter>
+  </emitters>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java
new file mode 100644
index 0000000..3d22250
--- /dev/null
+++ b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/OpenSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.opensearch;
+
+import org.junit.Test;
+
+public class OpenSearchTest {
+    @Test
+    public void testOne() throws Exception {
+
+    }
+}