You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/22 20:26:57 UTC

[tika] branch main updated: TIKA-3494 -- allow legacy concatenation of content via standard AutoDetectParser instead of RecursiveParserWrapper, numerous other bug fixes

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new a8f8bbc  TIKA-3494 -- allow legacy concatenation of content via standard AutoDetectParser instead of RecursiveParserWrapper, numerous other bug fixes
a8f8bbc is described below

commit a8f8bbc714ace8001b62cab22e70f9054c09e96a
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jul 22 16:26:35 2021 -0400

    TIKA-3494 -- allow legacy concatenation of content via standard AutoDetectParser instead of RecursiveParserWrapper, numerous other bug fixes
---
 CHANGES.txt                                        |   4 +
 .../java/org/apache/tika/config/ConfigBase.java    |   1 -
 .../java/org/apache/tika/pipes/HandlerConfig.java  |  62 +++++-
 .../java/org/apache/tika/pipes/PipesServer.java    |  89 +++++++--
 .../apache/tika/pipes/async/AsyncProcessor.java    |   6 +-
 .../tika/pipes/pipesiterator/PipesIterator.java    |  16 +-
 .../filelist/FileListPipesIterator.java            |   7 +-
 .../pipesiterator/fs/FileSystemPipesIterator.java  |   3 +-
 .../opensearch/tests/TikaPipesOpenSearchTest.java  | 208 +++++++++++++++++----
 .../opensearch/tika-config-opensearch.xml          |   7 +-
 .../pipes/solr/tests/TikaPipesSolrTestBase.java    |  19 +-
 .../src/test/resources/tika-config-solr-urls.xml   |   4 +-
 .../pipes/emitter/opensearch/OpenSearchClient.java |   2 +
 .../emitter/opensearch/OpenSearchEmitter.java      | 165 ++--------------
 .../tika/pipes/emitter/solr/SolrEmitter.java       |  49 ++---
 .../solr}/SolrPipesIterator.java                   |   2 +-
 .../metadata/serialization/JsonFetchEmitTuple.java |  10 +-
 .../serialization/JsonFetchEmitTupleTest.java      |   4 +-
 .../core/resource/RecursiveMetadataResource.java   |   9 +-
 .../org/apache/tika/server/core/TikaPipesTest.java |   3 +-
 20 files changed, 394 insertions(+), 276 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2a82d28..1d7818f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,9 @@
 Release 2.0.1 - ???
 
+   * Breaking change in the Solr and OpenSearch emitters. To achieve
+     the SKIP or CONCATENATE attachment strategy, modify the
+     parseMode in the pipesiterators or in the FetchEmitTuple (TIKA-3494).
+
    * Fix serialization of embedded docs in OpenSearch emitter (TIKA-3490).
 
 Release 2.0.0 - 07/07/2021
diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index 7b1b436..20f110c 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -372,7 +372,6 @@ public abstract class ConfigBase {
                     } catch (IllegalAccessException | InvocationTargetException e) {
                         throw new TikaConfigException("bad parameter " + setter, e);
                     }
-
                 } else {
                     try {
                         m.invoke(object, value);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
index 5272642..7fdbc3c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
@@ -17,6 +17,7 @@
 package org.apache.tika.pipes;
 
 import java.io.Serializable;
+import java.util.Locale;
 import java.util.Objects;
 
 import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -29,16 +30,57 @@ public class HandlerConfig implements Serializable {
     private static final long serialVersionUID = -3861669115439125268L;
 
     public static HandlerConfig DEFAULT_HANDLER_CONFIG =
-            new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1, -1);
+            new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, PARSE_MODE.RMETA,
+                    -1, -1);
+
+    /**
+     * {@link PARSE_MODE#RMETA} "recursive metadata" is the same as the -J option
+     * in tika-app and the /rmeta endpoint in tika-server.  Each embedded file is represented as
+     * its own metadata object.
+     *
+     * {@link PARSE_MODE#CONCATENATE} is similar
+     * to the legacy tika-app behavior and the /tika endpoint (accept: application/json) in
+     * tika-server.  This concatenates the
+     * contents of embedded files and returns a single metadata object for the file no
+     * matter how many embedded objects there are; this option throws away metadata from
+     * embedded objects and silently skips exceptions in embedded objects.
+     */
+    public enum PARSE_MODE {
+        RMETA,
+        CONCATENATE;
+
+        public static PARSE_MODE parseMode(String modeString) {
+            for (PARSE_MODE m : PARSE_MODE.values()) {
+                if (m.name().equalsIgnoreCase(modeString)) {
+                    return m;
+                }
+            }
+            StringBuilder sb = new StringBuilder();
+            int i = 0;
+            for (PARSE_MODE m : PARSE_MODE.values()) {
+                if (i++ > 0) {
+                    sb.append(", ");
+                }
+                sb.append(m.name().toLowerCase(Locale.US));
+            }
+            throw new IllegalArgumentException("mode must be one of: (" + sb +
+                    "). I regret I do not understand: " + modeString);
+        }
+    }
 
     private BasicContentHandlerFactory.HANDLER_TYPE type =
             BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+
     int writeLimit = -1;
     int maxEmbeddedResources = -1;
+    PARSE_MODE parseMode = PARSE_MODE.RMETA;
+
 
-    public HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE type, int writeLimit,
+    public HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE type, PARSE_MODE parseMode,
+                         int writeLimit,
                          int maxEmbeddedResources) {
         this.type = type;
+        this.parseMode = parseMode;
         this.writeLimit = writeLimit;
         this.maxEmbeddedResources = maxEmbeddedResources;
     }
@@ -55,10 +97,8 @@ public class HandlerConfig implements Serializable {
         return maxEmbeddedResources;
     }
 
-    @Override
-    public String toString() {
-        return "HandlerConfig{" + "type=" + type + ", writeLimit=" + writeLimit +
-                ", maxEmbeddedResources=" + maxEmbeddedResources + '}';
+    public PARSE_MODE getParseMode() {
+        return parseMode;
     }
 
     @Override
@@ -71,11 +111,17 @@ public class HandlerConfig implements Serializable {
         }
         HandlerConfig that = (HandlerConfig) o;
         return writeLimit == that.writeLimit && maxEmbeddedResources == that.maxEmbeddedResources &&
-                type == that.type;
+                type == that.type && parseMode == that.parseMode;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(type, writeLimit, maxEmbeddedResources);
+        return Objects.hash(type, writeLimit, maxEmbeddedResources, parseMode);
+    }
+
+    @Override
+    public String toString() {
+        return "HandlerConfig{" + "type=" + type + ", writeLimit=" + writeLimit +
+                ", maxEmbeddedResources=" + maxEmbeddedResources + ", mode=" + parseMode + '}';
     }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 0e117dd..68b304f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -28,15 +28,18 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.EncryptedDocumentException;
 import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.DocumentSelector;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.AutoDetectParser;
@@ -51,6 +54,7 @@ import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.ContentHandlerFactory;
 import org.apache.tika.sax.RecursiveParserWrapperHandler;
 import org.apache.tika.utils.ExceptionUtils;
 import org.apache.tika.utils.StringUtils;
@@ -116,7 +120,8 @@ public class PipesServer implements Runnable {
     private final long maxExtractSizeToReturn;
     private final long serverParseTimeoutMillis;
     private final long serverWaitTimeoutMillis;
-    private Parser parser;
+    private Parser autoDetectParser;
+    private Parser rMetaParser;
     private TikaConfig tikaConfig;
     private FetcherManager fetcherManager;
     private EmitterManager emitterManager;
@@ -307,7 +312,7 @@ public class PipesServer implements Runnable {
 
         Metadata metadata = new Metadata();
         try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
-            metadataList = parseMetadata(t, stream, metadata);
+            metadataList = parse(t, stream, metadata);
         } catch (SecurityException e) {
             LOG.error("security exception " + t.getId(), e);
             throw e;
@@ -378,17 +383,73 @@ public class PipesServer implements Runnable {
         exit(1);
     }
 
-    private List<Metadata> parseMetadata(FetchEmitTuple fetchEmitTuple, InputStream stream,
-                                         Metadata metadata) {
+    private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream,
+                                 Metadata metadata) {
         HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
+        if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
+            return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata);
+        } else {
+            return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata);
+        }
+    }
+
+    private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
+                                             HandlerConfig handlerConfig, InputStream stream,
+                                             Metadata metadata) {
+        ContentHandlerFactory contentHandlerFactory =
+                new BasicContentHandlerFactory(handlerConfig.getType(), handlerConfig.getWriteLimit());
+        ContentHandler handler = contentHandlerFactory.getNewContentHandler();
+        ParseContext parseContext = new ParseContext();
+        parseContext.set(DocumentSelector.class, new DocumentSelector() {
+            final int maxEmbedded = handlerConfig.maxEmbeddedResources;
+            int embedded = 0;
+            @Override
+            public boolean select(Metadata metadata) {
+                if (maxEmbedded < 0) {
+                    return true;
+                }
+                return embedded++ > maxEmbedded;
+            }
+        });
+
+        String containerException = null;
+        try {
+            autoDetectParser.parse(stream, handler, metadata, parseContext);
+        } catch (SAXException e) {
+            containerException = ExceptionUtils.getStackTrace(e);
+            LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
+        } catch (EncryptedDocumentException e) {
+            containerException = ExceptionUtils.getStackTrace(e);
+            LOG.warn("encrypted document:" + fetchEmitTuple.getId(), e);
+        } catch (SecurityException e) {
+            LOG.warn("security exception:" + fetchEmitTuple.getId(), e);
+            throw e;
+        } catch (Exception e) {
+            containerException = ExceptionUtils.getStackTrace(e);
+            LOG.warn("exception: " + fetchEmitTuple.getId(), e);
+        } finally {
+            metadata.add(TikaCoreProperties.TIKA_CONTENT, handler.toString());
+            if (containerException != null) {
+                metadata.add(TikaCoreProperties.CONTAINER_EXCEPTION, containerException);
+            }
+            try {
+                tikaConfig.getMetadataFilter().filter(metadata);
+            } catch (TikaException e) {
+                LOG.warn("exception mapping metadata", e);
+            }
+        }
+        return Collections.singletonList(metadata);
+    }
+
+    private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
+                                          HandlerConfig handlerConfig, InputStream stream,
+                                          Metadata metadata) {
         RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
-                new BasicContentHandlerFactory(handlerConfig.getType(),
-                    handlerConfig.getWriteLimit()),
-                handlerConfig.getMaxEmbeddedResources(),
-                tikaConfig.getMetadataFilter());
+                new BasicContentHandlerFactory(handlerConfig.getType(), handlerConfig.getWriteLimit()),
+                handlerConfig.getMaxEmbeddedResources(), tikaConfig.getMetadataFilter());
         ParseContext parseContext = new ParseContext();
         try {
-            parser.parse(stream, handler, metadata, parseContext);
+            rMetaParser.parse(stream, handler, metadata, parseContext);
         } catch (SAXException e) {
             LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
         } catch (EncryptedDocumentException e) {
@@ -443,16 +504,10 @@ public class PipesServer implements Runnable {
         this.tikaConfig = new TikaConfig(tikaConfigPath);
         this.fetcherManager = FetcherManager.load(tikaConfigPath);
         this.emitterManager = EmitterManager.load(tikaConfigPath);
-        Parser autoDetectParser = new AutoDetectParser(this.tikaConfig);
-        this.parser = new RecursiveParserWrapper(autoDetectParser);
-
+        this.autoDetectParser = new AutoDetectParser(this.tikaConfig);
+        this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
     }
 
-    private static class FetchException extends IOException {
-        FetchException(Throwable t) {
-            super(t);
-        }
-    }
 
     private void write(EmitData emitData, String stack) {
         //TODO -- what do we do with the stack?
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 926f8eb..a717f62 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -48,13 +48,13 @@ public class AsyncProcessor implements Closeable {
 
     static final int PARSER_FUTURE_CODE = 1;
 
-    static final AtomicLong TOTAL_PROCESSED = new AtomicLong(0);
 
     private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
     private final ArrayBlockingQueue<EmitData> emitData;
     private final ExecutorCompletionService<Integer> executorCompletionService;
     private final ExecutorService executorService;
     private final AsyncConfig asyncConfig;
+    private final AtomicLong totalProcessed = new AtomicLong(0);
     private int numParserThreadsFinished = 0;
     private boolean addedEmitterSemaphores = false;
     private int finished = 0;
@@ -165,7 +165,7 @@ public class AsyncProcessor implements Closeable {
     }
 
     public long getTotalProcessed() {
-        return TOTAL_PROCESSED.get();
+        return totalProcessed.get();
     }
 
     private class FetchEmitWorker implements Callable<Integer> {
@@ -203,7 +203,7 @@ public class AsyncProcessor implements Closeable {
                             //TODO -- add timeout, this currently hangs forever
                             emitDataQueue.offer(result.getEmitData());
                         }
-                        TOTAL_PROCESSED.incrementAndGet();
+                        totalProcessed.incrementAndGet();
                     }
                     checkActive();
                 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
index dfb5bf4..4227274 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
@@ -70,6 +70,9 @@ public abstract class PipesIterator extends ConfigBase
             FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
     private BasicContentHandlerFactory.HANDLER_TYPE handlerType =
             BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+
+    private HandlerConfig.PARSE_MODE parseMode = HandlerConfig.PARSE_MODE.RMETA;
+
     private int writeLimit = -1;
     private int maxEmbeddedResources = -1;
 
@@ -144,10 +147,19 @@ public abstract class PipesIterator extends ConfigBase
     }
 
     @Field
-    void setMaxEmbeddedResources(int maxEmbeddedResources) {
+    public void setMaxEmbeddedResources(int maxEmbeddedResources) {
         this.maxEmbeddedResources = maxEmbeddedResources;
     }
 
+    @Field
+    public void setParseMode(String parseModeString) {
+        setParseMode(HandlerConfig.PARSE_MODE.parseMode(parseModeString));
+    }
+
+    public void setParseMode(HandlerConfig.PARSE_MODE parsePARSEMode) {
+        this.parseMode = parsePARSEMode;
+    }
+
     public Integer call() throws Exception {
         enqueue();
         tryToAdd(COMPLETED_SEMAPHORE);
@@ -155,7 +167,7 @@ public abstract class PipesIterator extends ConfigBase
     }
 
     protected HandlerConfig getHandlerConfig() {
-        return new HandlerConfig(handlerType, writeLimit, maxEmbeddedResources);
+        return new HandlerConfig(handlerType, parseMode, writeLimit, maxEmbeddedResources);
     }
 
     protected abstract void enqueue() throws IOException, TimeoutException, InterruptedException;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
index 7d1c0db..90cabe8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
@@ -29,6 +29,7 @@ import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
@@ -66,7 +67,8 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
                 if (! line.startsWith("#") && !StringUtils.isBlank(line)) {
                     FetchKey fetchKey = new FetchKey(getFetcherName(), line);
                     EmitKey emitKey = new EmitKey(getEmitterName(), line);
-                    tryToAdd(new FetchEmitTuple(line, fetchKey, emitKey));
+                    tryToAdd(new FetchEmitTuple(line, fetchKey, emitKey,
+                            new Metadata(), getHandlerConfig(), getOnParseException()));
                 }
                 line = reader.readLine();
             }
@@ -83,6 +85,7 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
     public void setHasHeader(boolean hasHeader) {
         this.hasHeader = hasHeader;
     }
+
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
@@ -97,6 +100,4 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
                     "Must specify an existing file");
         }
     }
-
-
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
index d1a1923..35f424a 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
@@ -104,7 +104,8 @@ public class FileSystemPipesIterator extends PipesIterator implements Initializa
 
             try {
                 tryToAdd(new FetchEmitTuple(relPath, new FetchKey(fetcherName, relPath),
-                        new EmitKey(emitterName, relPath), new Metadata()));
+                        new EmitKey(emitterName, relPath), new Metadata(), getHandlerConfig(),
+                        getOnParseException()));
             } catch (TimeoutException e) {
                 throw new IOException(e);
             } catch (InterruptedException e) {
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java
index f6dfb43..17f2435 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java
@@ -17,6 +17,7 @@
 package org.apache.tika.pipes.opensearch.tests;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -27,9 +28,10 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.regex.Matcher;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.commons.io.IOUtils;
 import org.jetbrains.annotations.NotNull;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -39,20 +41,20 @@ import org.testcontainers.utility.DockerImageName;
 
 import org.apache.tika.cli.TikaCLI;
 import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.opensearch.JsonResponse;
 import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter;
 
 public class TikaPipesOpenSearchTest {
 
-    private static final String COLLECTION = "testcol";
+    private static final String TEST_INDEX = "tika-pipes-index";
     private static final File TEST_FILE_FOLDER = new File("target", "test-files");
-    private final int numHtmlDocs = 42;
     private int numTestDocs = 0;
     protected GenericContainer<?> openSearch;
     private String openSearchHost;
     private int openSearchPort;
     //this includes the collection, e.g. https://localhost:49213/testcol
-    private String openSearchEndpoint;
+    private String openSearchEndpointBase;
     private OpenSearchTestClient client;
 
     @Rule
@@ -70,22 +72,164 @@ public class TikaPipesOpenSearchTest {
         setupOpenSearch(openSearchContainer);
     }
 
-    @AfterClass
-    public static void tearDown() throws Exception {
+    @After
+    public void tearDown() throws Exception {
         FileUtils.deleteDirectory(TEST_FILE_FOLDER);
     }
 
     @Test
-    public void testFSToOpenSearch() throws Exception {
+    public void testBasicFSToOpenSearch() throws Exception {
+        int numHtmlDocs = 42;
+        createTestHtmlFiles("Happiness", numHtmlDocs);
+
+        String endpoint = openSearchEndpointBase + TEST_INDEX;
+        sendMappings(endpoint, TEST_INDEX, "opensearch-mappings.json");
+
+        runPipes(OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                HandlerConfig.PARSE_MODE.CONCATENATE, endpoint);
+
+        String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+                "\"query\": \"happiness\" } } } }";
+
+        JsonResponse results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + numTestDocs,
+                results.getJson().get("hits").get("total").get("value").asInt());
+
+        //now try match all
+        query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} } }";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + numTestDocs,
+                results.getJson().get("hits").get("total").get("value").asInt());
+    }
+
+    @Test
+    public void testParentChildFSToOpenSearch() throws Exception {
+        int numHtmlDocs = 42;
+        createTestHtmlFiles("Happiness", numHtmlDocs);
+        String endpoint = openSearchEndpointBase + TEST_INDEX;
+        sendMappings(endpoint, TEST_INDEX, "opensearch-parent-child-mappings.json");
+
+        runPipes(OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD,
+                HandlerConfig.PARSE_MODE.RMETA, endpoint);
+
+        String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+                "\"query\": \"happiness\" } } } }";
+
+        JsonResponse results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + numTestDocs,
+              results.getJson().get("hits").get("total").get("value").asInt());
+
+        //now try match all
+        query = "{ " +
+                //"\"from\":0, \"size\":1000," +
+                "\"track_total_hits\": true, \"query\": { " +
+                "\"match_all\": {} } }";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+                results.getJson().get("hits").get("total").get("value").asInt());
+
+        //now check out one of the embedded files
+        query = "{ \"track_total_hits\": true, \"query\": { \"query_string\": { " +
+                "\"default_field\": \"content\",  " +
+                "\"query\": \"embed4 zip\" , \"minimum_should_match\":2 } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(1,
+                results.getJson().get("hits").get("total").get("value").asInt());
+        JsonNode source = results.getJson().get("hits").get("hits").get(0).get("_source");
+
+        assertEquals("test_recursive_embedded.docx-9",
+                results.getJson().get("hits").get("hits").get(0).get("_id").asText());
+        assertEquals("test_recursive_embedded.docx",
+                results.getJson().get("hits").get("hits").get(0).get("_routing").asText());
+        assertEquals("test_recursive_embedded.docx",
+                source.get("relation_type").get("parent").asText());
+        assertEquals("embedded",
+                source.get("relation_type").get("name").asText());
+
+        assertEquals("application/zip", source.get("mime").asText());
+
+        //now make sure all the children are returned by a parent search
+        query = "{ \"track_total_hits\": true, \"query\": { \"parent_id\": { " +
+                "\"type\": \"embedded\",  " +
+                "\"id\": \"test_recursive_embedded.docx\" } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(11,
+                results.getJson().get("hits").get("total").get("value").asInt());
+    }
+
+
+    @Test
+    public void testSeparateDocsFSToOpenSearch() throws Exception {
+        int numHtmlDocs = 42;
+        createTestHtmlFiles("Happiness", numHtmlDocs);
+        String endpoint = openSearchEndpointBase + TEST_INDEX;
+        sendMappings(endpoint, TEST_INDEX, "opensearch-mappings.json");
+
+        runPipes(OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS,
+                HandlerConfig.PARSE_MODE.RMETA, endpoint);
+
+        String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+                "\"query\": \"happiness\" } } } }";
+
+        JsonResponse results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + numTestDocs,
+                results.getJson().get("hits").get("total").get("value").asInt());
+
+        //now try match all
+        query = "{ " +
+                //"\"from\":0, \"size\":1000," +
+                "\"track_total_hits\": true, \"query\": { " +
+                "\"match_all\": {} } }";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+                results.getJson().get("hits").get("total").get("value").asInt());
+
+        //now check out one of the embedded files
+        query = "{ \"track_total_hits\": true, \"query\": { \"query_string\": { " +
+                "\"default_field\": \"content\",  " +
+                "\"query\": \"embed4 zip\" , \"minimum_should_match\":2 } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(200, results.getStatus());
+        assertEquals(1,
+                results.getJson().get("hits").get("total").get("value").asInt());
+        JsonNode source = results.getJson().get("hits").get("hits").get(0).get("_source");
+
+        assertEquals("test_recursive_embedded.docx-9",
+                results.getJson().get("hits").get("hits").get(0).get("_id").asText());
+        assertNull("test_recursive_embedded.docx",
+                results.getJson().get("hits").get("hits").get(0).get("_routing"));
+        assertNull("test_recursive_embedded.docx",
+                source.get("relation_type"));
+
+        assertEquals("application/zip", source.get("mime").asText());
+
+        //now make sure there are no children; this query should
+        //cause an exception because there are no relationships in the schema
+        query = "{ \"track_total_hits\": true, \"query\": { \"parent_id\": { " +
+                "\"type\": \"embedded\",  " +
+                "\"id\": \"test_recursive_embedded.docx\" } } } ";
+        results = client.postJson(endpoint + "/_search", query);
+        assertEquals(400, results.getStatus());
+    }
+
+
+    private void sendMappings(String endpoint, String index, String mappingsFile) throws Exception {
         //create the collection with mappings
         String mappings = IOUtils.toString(TikaPipesOpenSearchTest.class.getResourceAsStream(
-                "/opensearch/opensearch-mappings.json"), StandardCharsets.UTF_8);
+                "/opensearch/" + mappingsFile), StandardCharsets.UTF_8);
         int status = -1;
         int tries = 0;
         JsonResponse response = null;
         //need to wait a bit sometimes before OpenSearch is up
         while (status != 200 && tries++ < 20) {
-            response = client.putJson(openSearchEndpoint, mappings);
+            response = client.putJson(endpoint, mappings);
             if (status != 200) {
                 Thread.sleep(1000);
             }
@@ -96,31 +240,12 @@ public class TikaPipesOpenSearchTest {
                     response);
         }
         assertTrue(response.getJson().get("acknowledged").asBoolean());
-        assertEquals("testcol", response.getJson().get("index").asText());
-
-        runPipes(OpenSearchEmitter.AttachmentStrategy.SKIP);
-        //refresh to make sure the content is searchable
-        JsonResponse refresh = client.getJson(openSearchEndpoint + "/_refresh");
-
-        String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
-                "\"query\": \"happiness\" } } } }";
-
-        JsonResponse results = client.postJson(openSearchEndpoint + "/_search", query);
-        assertEquals(200, results.getStatus());
-        //assertEquals(numHtmlDocs + numTestDocs,
-          //      results.getJson().get("hits").get("total").get("value").asInt());
-
-        //now try match all
-        query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} } }";
-        results = client.postJson(openSearchEndpoint + "/_search", query);
-        assertEquals(200, results.getStatus());
-        assertEquals(numHtmlDocs + numTestDocs,
-                results.getJson().get("hits").get("total").get("value").asInt());
-
+        assertEquals(index, response.getJson().get("index").asText());
 
     }
 
-    private void runPipes(OpenSearchEmitter.AttachmentStrategy attachmentStrategy) throws Exception {
+    private void runPipes(OpenSearchEmitter.AttachmentStrategy attachmentStrategy,
+                          HandlerConfig.PARSE_MODE parseMode, String endpoint) throws Exception {
 
         File tikaConfigFile = new File("target", "ta-opensearch.xml");
         File log4jPropFile = new File("target", "tmp-log4j2.xml");
@@ -136,45 +261,48 @@ public class TikaPipesOpenSearchTest {
 
         String tikaConfigXml =
                 createTikaConfigXml(tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
-                        attachmentStrategy);
+                        attachmentStrategy, parseMode, endpoint);
         FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
 
         TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
 
+        //refresh to make sure the content is searchable
+        JsonResponse refresh = client.getJson(endpoint + "/_refresh");
 
     }
 
     @NotNull
     private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile, String tikaConfigTemplateXml,
-                                       OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
+                                       OpenSearchEmitter.AttachmentStrategy attachmentStrategy,
+                                       HandlerConfig.PARSE_MODE parseMode, String endpoint) {
         String res =
                 tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
                         .replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
                         .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
                         .replaceAll("\\{PATH_TO_DOCS\\}", 
-                                Matcher.quoteReplacement(TEST_FILE_FOLDER.getAbsolutePath()));
+                                Matcher.quoteReplacement(TEST_FILE_FOLDER.getAbsolutePath()))
+                        .replace("{PARSE_MODE}", parseMode.name());
 
-        res = res.replace("{OPENSEARCH_CONNECTION}", openSearchEndpoint);
+        res = res.replace("{OPENSEARCH_CONNECTION}", endpoint);
 
         return res;
 
     }
 
     private void setupOpenSearch(GenericContainer<?> openSearchContainer) throws Exception {
-        createTestHtmlFiles("Happiness");
         this.openSearch = openSearchContainer;
         openSearchHost = openSearch.getHost();
         openSearchPort = openSearch.getMappedPort(9200);
-        openSearchEndpoint = "https://" + openSearchHost + ":" + openSearchPort + "/" + COLLECTION;
+        openSearchEndpointBase = "https://" + openSearchHost + ":" + openSearchPort + "/";
         HttpClientFactory httpClientFactory = new HttpClientFactory();
         httpClientFactory.setUserName("admin");
         httpClientFactory.setPassword("admin");
         //attachment strategy is not used here...TODO clean this up
-        client = new OpenSearchTestClient(openSearchEndpoint,
-                httpClientFactory.build(), OpenSearchEmitter.AttachmentStrategy.SKIP);
+        client = new OpenSearchTestClient(openSearchEndpointBase,
+                httpClientFactory.build(), OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS);
     }
 
-    private void createTestHtmlFiles(String bodyContent) throws Exception {
+    private void createTestHtmlFiles(String bodyContent, int numHtmlDocs) throws Exception {
         TEST_FILE_FOLDER.mkdirs();
         for (int i = 0; i < numHtmlDocs; ++i) {
             FileUtils.writeStringToFile(new File(TEST_FILE_FOLDER, "test-" + i + ".html"),
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
index 42a2bc3..af0b53b 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
@@ -65,12 +65,11 @@
       <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
       <emitWithinMillis>60000</emitWithinMillis>
       <numEmitters>1</numEmitters>
-      <numClients>1</numClients>
+      <numClients>3</numClients>
       <tikaConfig>{TIKA_CONFIG}</tikaConfig>
       <forkedJvmArgs>
-        <arg>-Xmx1g</arg>
+        <arg>-Xmx512m</arg>
         <arg>-XX:ParallelGCThreads=2</arg>
-        <arg>-XX:+ExitOnOutOfMemoryError</arg>
         <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
       </forkedJvmArgs>
       <timeoutMillis>60000</timeoutMillis>
@@ -93,7 +92,6 @@
         <updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
         -->
         <attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
-        <contentField>content</contentField>
         <commitWithin>10</commitWithin>
         <idField>_id</idField>
         <connectionTimeout>10000</connectionTimeout>
@@ -108,6 +106,7 @@
       <basePath>{PATH_TO_DOCS}</basePath>
       <fetcherName>fsf</fetcherName>
       <emitterName>ose</emitterName>
+      <parseMode>{PARSE_MODE}</parseMode>
     </params>
   </pipesIterator>
 </properties>
diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
index 5083ff0..3810b3d 100644
--- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
+++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
 import org.apache.solr.common.SolrInputDocument;
 import org.jetbrains.annotations.NotNull;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -35,6 +36,7 @@ import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
 import org.testcontainers.utility.DockerImageName;
 
 import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.solr.SolrEmitter;
 
 public abstract class TikaPipesSolrTestBase {
@@ -62,6 +64,11 @@ public abstract class TikaPipesSolrTestBase {
         setupSolr(solrContainer);
     }
 
+    @After
+    public void tearDown() throws Exception {
+        FileUtils.deleteDirectory(testFileFolder);
+    }
+
     @Test
     public void testFetchIteratorWithSolrUrls() throws Exception {
         runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter();
@@ -120,7 +127,8 @@ public abstract class TikaPipesSolrTestBase {
         String tikaConfigXml =
                 createTikaConfigXml(useZk(), tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
                         SolrEmitter.UpdateStrategy.ADD,
-                        SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+                        SolrEmitter.AttachmentStrategy.PARENT_CHILD,
+                        HandlerConfig.PARSE_MODE.CONCATENATE);
         FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
 
         TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
@@ -142,7 +150,8 @@ public abstract class TikaPipesSolrTestBase {
         tikaConfigXml =
                 createTikaConfigXml(useZk(), tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
                         SolrEmitter.UpdateStrategy.UPDATE_MUST_EXIST,
-                        SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+                        SolrEmitter.AttachmentStrategy.PARENT_CHILD,
+                        HandlerConfig.PARSE_MODE.CONCATENATE);
         FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
 
         TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
@@ -163,13 +172,15 @@ public abstract class TikaPipesSolrTestBase {
     private String createTikaConfigXml(boolean useZk, File tikaConfigFile, File log4jPropFile,
                                        String tikaConfigTemplateXml,
                                        SolrEmitter.UpdateStrategy updateStrategy,
-                                       SolrEmitter.AttachmentStrategy attachmentStrategy) {
+                                       SolrEmitter.AttachmentStrategy attachmentStrategy,
+                                       HandlerConfig.PARSE_MODE parseMode) {
         String res =
                 tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
                         .replace("{UPDATE_STRATEGY}", updateStrategy.toString())
                         .replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
                         .replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
-                        .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath());
+                        .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath())
+                        .replace("{PARSE_MODE}", parseMode.name());
         if (useZk) {
             res = res.replace("{SOLR_CONNECTION}",
                     "<solrZkHosts>\n" + "        <solrZkHost>" + solrHost + ":" + zkPort +
diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml
index 121185b..5f2740f 100644
--- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml
+++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml
@@ -92,7 +92,6 @@
         <updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
         <solrCollection>testcol</solrCollection>
         <attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
-        <contentField>content</contentField>
         <commitWithin>10</commitWithin>
         <idField>id</idField>
         <connectionTimeout>10000</connectionTimeout>
@@ -106,7 +105,7 @@
       </params>
     </emitter>
   </emitters>
-  <pipesIterator class="org.apache.tika.pipes.solrtest.SolrPipesIterator">
+  <pipesIterator class="org.apache.tika.pipes.pipesiterator.solr.SolrPipesIterator">
     <params>
       <solrCollection>testcol</solrCollection>
       {SOLR_CONNECTION}
@@ -114,6 +113,7 @@
       <parsingIdField>parsing_id_i</parsingIdField>
       <failCountField>fail_count_i</failCountField>
       <sizeFieldName>size_i</sizeFieldName>
+      <parseMode>{PARSE_MODE}</parseMode>
       <rows>10</rows>
       <fetcherName>fsf</fetcherName>
       <emitterName>se</emitterName>
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
index ab348ad..face044 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -113,6 +113,8 @@ public class OpenSearchClient {
                 jsonGenerator.writeStringField("parent", emitKey);
                 //end the relation type object
                 jsonGenerator.writeEndObject();
+            } else if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS) {
+                jsonGenerator.writeStringField("parent", emitKey);
             }
             //end the metadata object
             jsonGenerator.writeEndObject();
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
index 5f61814..208f138 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
@@ -19,7 +19,6 @@ package org.apache.tika.pipes.emitter.opensearch;
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -43,7 +42,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
 
 
     public enum AttachmentStrategy {
-        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+        SEPARATE_DOCUMENTS, PARENT_CHILD,
         //anything else?
     }
 
@@ -51,7 +50,6 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
 
     private String openSearchUrl = null;
-    private String contentField = "content";
     private String idField = "_id";
     private int commitWithin = 1000;
     private OpenSearchClient openSearchClient;
@@ -69,144 +67,27 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
             return;
         }
         try {
-            if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
-                metadataList = concatenate(metadataList);
-            } else if (attachmentStrategy == AttachmentStrategy.SKIP) {
-                metadataList = Collections.singletonList(metadataList.get(0));
-            }
             openSearchClient.addDocument(emitKey, metadataList);
         } catch (TikaClientException e) {
             throw new TikaEmitterException("failed to add document", e);
         }
     }
 
-    private List<Metadata> concatenate(List<Metadata> metadataList) {
-        if (metadataList.size() == 1) {
-            return metadataList;
-        }
-
-        Metadata ret = metadataList.get(0);
-        StringBuilder content = new StringBuilder();
-        for (Metadata m : metadataList) {
-            String c = m.get(getContentField());
-            if (! StringUtils.isBlank(c)) {
-                content.append(c).append("\n");
-            }
-        }
-        ret.set(getContentField(), content.toString());
-        return Collections.singletonList(ret);
-
-    }
-/*
-    private void addMetadataAsSolrInputDocuments(String emitKey, List<Metadata> metadataList,
-                                                 List<SolrInputDocument> docsToUpdate)
-            throws IOException, TikaEmitterException {
-        SolrInputDocument solrInputDocument = new SolrInputDocument();
-        solrInputDocument.setField(idField, emitKey);
-        if (updateStrategy == UpdateStrategy.UPDATE_MUST_EXIST) {
-            solrInputDocument.setField("_version_", 1);
-        } else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
-            solrInputDocument.setField("_version_", -1);
-        }
-        if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
-            addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
-        } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
-            //this only handles text for now, not xhtml
-            StringBuilder sb = new StringBuilder();
-            for (Metadata metadata : metadataList) {
-                String content = metadata.get(getContentField());
-                if (content != null) {
-                    sb.append(content).append("\n");
-                }
-            }
-            Metadata parent = metadataList.get(0);
-            parent.set(getContentField(), sb.toString());
-            addMetadataToSolrInputDocument(parent, solrInputDocument, updateStrategy);
-        } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
-            addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
-            for (int i = 1; i < metadataList.size(); i++) {
-                SolrInputDocument childSolrInputDocument = new SolrInputDocument();
-                Metadata m = metadataList.get(i);
-                childSolrInputDocument.setField(idField, UUID.randomUUID().toString());
-                addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
-            }
-        } else {
-            throw new IllegalArgumentException(
-                    "I don't yet support this attachment strategy: " + attachmentStrategy);
-        }
-        docsToUpdate.add(solrInputDocument);
-    }
-
-    @Override
-    public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
-        if (batch == null || batch.size() == 0) {
-            LOG.warn("batch is null or empty");
-            return;
-        }
-        List<SolrInputDocument> docsToUpdate = new ArrayList<>();
-        for (EmitData d : batch) {
-            addMetadataAsSolrInputDocuments(d.getEmitKey().getEmitKey(), d.getMetadataList(),
-                    docsToUpdate);
-        }
-        emitSolrBatch(docsToUpdate);
-    }
-
-    private void emitSolrBatch(List<SolrInputDocument> docsToUpdate)
-            throws IOException, TikaEmitterException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Emitting solr doc batch: {}", docsToUpdate);
-        }
-        if (!docsToUpdate.isEmpty()) {
-            try {
-                UpdateRequest req = new UpdateRequest();
-                req.add(docsToUpdate);
-                req.setCommitWithin(commitWithin);
-                req.setParam("failOnVersionConflicts", "false");
-                req.process(openSearchClient, solrCollection);
-            } catch (Exception e) {
-                throw new TikaEmitterException("Could not add batch to solr", e);
-            }
-        }
-    }
-
-    private void addMetadataToSolrInputDocument(Metadata metadata,
-                                                SolrInputDocument solrInputDocument,
-                                                UpdateStrategy updateStrategy) {
-        for (String n : metadata.names()) {
-            String[] vals = metadata.getValues(n);
-            if (vals.length == 0) {
-                continue;
-            } else if (vals.length == 1) {
-                if (updateStrategy == UpdateStrategy.ADD) {
-                    solrInputDocument.setField(n, vals[0]);
-                } else {
-                    solrInputDocument.setField(n, new HashMap<String, String>() {{
-                            put("set", vals[0]);
-                        }
-                    });
-                }
-            } else if (vals.length > 1) {
-                if (updateStrategy == UpdateStrategy.ADD) {
-                    solrInputDocument.setField(n, vals);
-                } else {
-                    solrInputDocument.setField(n, new HashMap<String, String[]>() {{
-                            put("set", vals);
-                        }
-                    });
-                }
-            }
-        }
-    }*/
 
     /**
-     * Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is "PARENT_CHILD".
-     * If set to "SKIP", this will index only the main file and ignore all info
-     * in the attachments.  If set to "CONCATENATE_CONTENT", this will concatenate the
-     * content extracted from the attachments into the main document and
-     * then index the main document with the concatenated content _and_ the
-     * main document's metadata (metadata from attachments will be thrown away).
-     * If set to "PARENT_CHILD", this will index the attachments as children
-     * of the parent document via OpenSearch's parent-child relationship.
+     * Options: SEPARATE_DOCUMENTS, PARENT_CHILD. Default is "SEPARATE_DOCUMENTS".
+     * All embedded documents are treated as independent documents.
+     * PARENT_CHILD requires a schema to be set up for the relationship type;
+     * all embedded objects (no matter how deeply nested) will have a single
+     * parent of the main container document.
+     *
+     * If you want to concatenate the content of embedded files and ignore
+     * the metadata of embedded files, set
+     * {@link org.apache.tika.pipes.HandlerConfig}'s parseMode to
+     * {@link org.apache.tika.pipes.HandlerConfig.PARSE_MODE#CONCATENATE}
+     * in your {@link org.apache.tika.pipes.FetchEmitTuple} or in the
+     * &lt;parseMode&gt; element in your {@link org.apache.tika.pipes.pipesiterator.PipesIterator}
+     * configuration.
      */
     @Field
     public void setAttachmentStrategy(String attachmentStrategy) {
@@ -224,24 +105,6 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
         httpClientFactory.setSocketTimeout(socketTimeout);
     }
 
-    public String getContentField() {
-        return contentField;
-    }
-
-    /**
-     * This is the field _after_ metadata mappings have been applied
-     * that contains the "content" for each metadata object.
-     * <p>
-     * This is the field that is used if {@link #attachmentStrategy}
-     * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
-     *
-     * @param contentField
-     */
-    @Field
-    public void setContentField(String contentField) {
-        this.contentField = contentField;
-    }
-
     public int getCommitWithin() {
         return commitWithin;
     }
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 534e899..a1388e0 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -91,20 +91,9 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         } else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
             solrInputDocument.setField("_version_", -1);
         }
-        if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
+        if (metadataList.size() == 1) {
             addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
-        } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
-            //this only handles text for now, not xhtml
-            StringBuilder sb = new StringBuilder();
-            for (Metadata metadata : metadataList) {
-                String content = metadata.get(getContentField());
-                if (content != null) {
-                    sb.append(content).append("\n");
-                }
-            }
-            Metadata parent = metadataList.get(0);
-            parent.set(getContentField(), sb.toString());
-            addMetadataToSolrInputDocument(parent, solrInputDocument, updateStrategy);
+            docsToUpdate.add(solrInputDocument);
         } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
             addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
             for (int i = 1; i < metadataList.size(); i++) {
@@ -112,12 +101,24 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
                 Metadata m = metadataList.get(i);
                 childSolrInputDocument.setField(idField, UUID.randomUUID().toString());
                 addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
+                solrInputDocument.addChildDocument(childSolrInputDocument);
+            }
+            docsToUpdate.add(solrInputDocument);
+        } else if (attachmentStrategy == AttachmentStrategy.SEPARATE_DOCUMENTS) {
+            addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
+            docsToUpdate.add(solrInputDocument);
+            for (int i = 1; i < metadataList.size(); i++) {
+                SolrInputDocument childSolrInputDocument = new SolrInputDocument();
+                Metadata m = metadataList.get(i);
+                childSolrInputDocument.setField(idField,
+                        solrInputDocument.get(idField) + "-" + UUID.randomUUID().toString());
+                addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
+                docsToUpdate.add(childSolrInputDocument);
             }
         } else {
             throw new IllegalArgumentException(
                     "I don't yet support this attachment strategy: " + attachmentStrategy);
         }
-        docsToUpdate.add(solrInputDocument);
     }
 
     @Override
@@ -211,24 +212,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
         this.socketTimeout = socketTimeout;
     }
 
-    public String getContentField() {
-        return contentField;
-    }
-
-    /**
-     * This is the field _after_ metadata mappings have been applied
-     * that contains the "content" for each metadata object.
-     * <p>
-     * This is the field that is used if {@link #attachmentStrategy}
-     * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
-     *
-     * @param contentField
-     */
-    @Field
-    public void setContentField(String contentField) {
-        this.contentField = contentField;
-    }
-
     public int getCommitWithin() {
         return commitWithin;
     }
@@ -326,7 +309,7 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
     }
 
     public enum AttachmentStrategy {
-        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+        SEPARATE_DOCUMENTS, PARENT_CHILD,
         //anything else?
     }
 
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
similarity index 99%
rename from tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
index 85b070c..4cc50d0 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.solrtest;
+package org.apache.tika.pipes.pipesiterator.solr;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 53a4f2d..cb23b34 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -48,6 +48,7 @@ public class JsonFetchEmitTuple {
     private static final String HANDLER_CONFIG_TYPE = "type";
     private static final String HANDLER_CONFIG_WRITE_LIMIT = "writeLimit";
     private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
+    private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode";
 
 
     public static FetchEmitTuple fromJson(Reader reader) throws IOException {
@@ -127,6 +128,7 @@ public class JsonFetchEmitTuple {
                 BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
         int writeLimit = -1;
         int maxEmbeddedResources = -1;
+        HandlerConfig.PARSE_MODE parseMode = HandlerConfig.PARSE_MODE.RMETA;
         String fieldName = jParser.nextFieldName();
         while (fieldName != null) {
             switch (fieldName) {
@@ -141,13 +143,17 @@ public class JsonFetchEmitTuple {
                 case HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES:
                     maxEmbeddedResources = jParser.nextIntValue(-1);
                     break;
+                case HANDLER_CONFIG_PARSE_MODE:
+                    String modeString = jParser.nextTextValue();
+                    parseMode = HandlerConfig.PARSE_MODE.parseMode(modeString);
+                    break;
                 default:
                     throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
                                                        "' in the context of a handler config");
             }
             fieldName = jParser.nextFieldName();
         }
-        return new HandlerConfig(handlerType, writeLimit, maxEmbeddedResources);
+        return new HandlerConfig(handlerType, parseMode, writeLimit, maxEmbeddedResources);
     }
 
     private static String getValue(JsonParser jParser) throws IOException {
@@ -189,6 +195,8 @@ public class JsonFetchEmitTuple {
             jsonGenerator.writeStartObject();
             jsonGenerator.writeStringField(HANDLER_CONFIG_TYPE,
                     t.getHandlerConfig().getType().name().toLowerCase(Locale.ROOT));
+            jsonGenerator.writeStringField(HANDLER_CONFIG_PARSE_MODE,
+                    t.getHandlerConfig().getParseMode().name().toLowerCase(Locale.ROOT));
             jsonGenerator.writeNumberField(HANDLER_CONFIG_WRITE_LIMIT,
                     t.getHandlerConfig().getWriteLimit());
             jsonGenerator.writeNumberField(HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES,
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
index c335bbc..56ad551 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
@@ -44,7 +44,9 @@ public class JsonFetchEmitTupleTest {
 
         FetchEmitTuple t = new FetchEmitTuple("my_id", new FetchKey("my_fetcher", "fetchKey1"),
                 new EmitKey("my_emitter", "emitKey1"), m,
-                new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, 10000, 10),
+                new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML,
+                        HandlerConfig.PARSE_MODE.CONCATENATE,
+                        10000,10),
                 FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index cd43f1b..c001f0b 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -122,11 +122,12 @@ public class RecursiveMetadataResource {
         return Response
                 .ok(parseMetadataToMetadataList(att.getObject(InputStream.class), new Metadata(),
                         att.getHeaders(), info,
-                        buildHandlerConfig(att.getHeaders(), handlerTypeName))).build();
+                        buildHandlerConfig(att.getHeaders(), handlerTypeName,
+                                HandlerConfig.PARSE_MODE.RMETA))).build();
     }
 
     static HandlerConfig buildHandlerConfig(MultivaluedMap<String, String> httpHeaders,
-                                            String handlerTypeName) {
+                                            String handlerTypeName, HandlerConfig.PARSE_MODE parseMode) {
         int writeLimit = -1;
         if (httpHeaders.containsKey("writeLimit")) {
             writeLimit = Integer.parseInt(httpHeaders.getFirst("writeLimit"));
@@ -138,6 +139,7 @@ public class RecursiveMetadataResource {
         }
         return new HandlerConfig(
                 BasicContentHandlerFactory.parseHandlerType(handlerTypeName, DEFAULT_HANDLER_TYPE),
+                parseMode,
                 writeLimit, maxEmbeddedResources);
     }
 
@@ -176,7 +178,8 @@ public class RecursiveMetadataResource {
         return Response.ok(parseMetadataToMetadataList(
                 TikaResource.getInputStream(is, metadata, httpHeaders), metadata,
                 httpHeaders.getRequestHeaders(), info,
-                buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName))).build();
+                buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName,
+                        HandlerConfig.PARSE_MODE.RMETA))).build();
     }
 
     private MetadataList parseMetadataToMetadataList(InputStream is, Metadata metadata,
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index bc7e824..b63e29d 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -215,7 +215,8 @@ public class TikaPipesTest extends CXFTestBase {
                         new FetchKey("fsf", "hello_world.xml"),
                         new EmitKey("fse", ""),
                         userMetadata,
-                        new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, -1, -1),
+                        new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML,
+                                HandlerConfig.PARSE_MODE.RMETA, -1, -1),
                         FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
         StringWriter writer = new StringWriter();
         JsonFetchEmitTuple.toJson(t, writer);