You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/22 20:26:57 UTC
[tika] branch main updated: TIKA-3494 -- allow legacy concatenation
of content via standard AutoDetectParser instead of RecursiveParserWrapper,
numerous other bug fixes
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new a8f8bbc TIKA-3494 -- allow legacy concatenation of content via standard AutoDetectParser instead of RecursiveParserWrapper, numerous other bug fixes
a8f8bbc is described below
commit a8f8bbc714ace8001b62cab22e70f9054c09e96a
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jul 22 16:26:35 2021 -0400
TIKA-3494 -- allow legacy concatenation of content via standard AutoDetectParser instead of RecursiveParserWrapper, numerous other bug fixes
---
CHANGES.txt | 4 +
.../java/org/apache/tika/config/ConfigBase.java | 1 -
.../java/org/apache/tika/pipes/HandlerConfig.java | 62 +++++-
.../java/org/apache/tika/pipes/PipesServer.java | 89 +++++++--
.../apache/tika/pipes/async/AsyncProcessor.java | 6 +-
.../tika/pipes/pipesiterator/PipesIterator.java | 16 +-
.../filelist/FileListPipesIterator.java | 7 +-
.../pipesiterator/fs/FileSystemPipesIterator.java | 3 +-
.../opensearch/tests/TikaPipesOpenSearchTest.java | 208 +++++++++++++++++----
.../opensearch/tika-config-opensearch.xml | 7 +-
.../pipes/solr/tests/TikaPipesSolrTestBase.java | 19 +-
.../src/test/resources/tika-config-solr-urls.xml | 4 +-
.../pipes/emitter/opensearch/OpenSearchClient.java | 2 +
.../emitter/opensearch/OpenSearchEmitter.java | 165 ++--------------
.../tika/pipes/emitter/solr/SolrEmitter.java | 49 ++---
.../solr}/SolrPipesIterator.java | 2 +-
.../metadata/serialization/JsonFetchEmitTuple.java | 10 +-
.../serialization/JsonFetchEmitTupleTest.java | 4 +-
.../core/resource/RecursiveMetadataResource.java | 9 +-
.../org/apache/tika/server/core/TikaPipesTest.java | 3 +-
20 files changed, 394 insertions(+), 276 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2a82d28..1d7818f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,9 @@
Release 2.0.1 - ???
+ * Breaking change in the Solr and OpenSearch emitters. To achieve
+ the SKIP or CONCATENATE attachment strategy, modify the
+ parseMode in the pipesiterators or in the FetchEmitTuple (TIKA-3494).
+
* Fix serialization of embedded docs in OpenSearch emitter (TIKA-3490).
Release 2.0.0 - 07/07/2021
diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index 7b1b436..20f110c 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -372,7 +372,6 @@ public abstract class ConfigBase {
} catch (IllegalAccessException | InvocationTargetException e) {
throw new TikaConfigException("bad parameter " + setter, e);
}
-
} else {
try {
m.invoke(object, value);
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
index 5272642..7fdbc3c 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/HandlerConfig.java
@@ -17,6 +17,7 @@
package org.apache.tika.pipes;
import java.io.Serializable;
+import java.util.Locale;
import java.util.Objects;
import org.apache.tika.sax.BasicContentHandlerFactory;
@@ -29,16 +30,57 @@ public class HandlerConfig implements Serializable {
private static final long serialVersionUID = -3861669115439125268L;
public static HandlerConfig DEFAULT_HANDLER_CONFIG =
- new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, -1, -1);
+ new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.TEXT, PARSE_MODE.RMETA,
+ -1, -1);
+
+ /**
+ * {@link PARSE_MODE#RMETA} "recursive metadata" is the same as the -J option
+ * in tika-app and the /rmeta endpoint in tika-server. Each embedded file is represented as
+ * its own metadata object.
+ *
+ * {@link PARSE_MODE#CONCATENATE} is similar
+ * to the legacy tika-app behavior and the /tika endpoint (accept: application/json) in
+ * tika-server. This concatenates the
+ * contents of embedded files and returns a single metadata object for the file no
+ * matter how many embedded objects there are; this option throws away metadata from
+ * embedded objects and silently skips exceptions in embedded objects.
+ */
+ public enum PARSE_MODE {
+ RMETA,
+ CONCATENATE;
+
+ public static PARSE_MODE parseMode(String modeString) {
+ for (PARSE_MODE m : PARSE_MODE.values()) {
+ if (m.name().equalsIgnoreCase(modeString)) {
+ return m;
+ }
+ }
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ for (PARSE_MODE m : PARSE_MODE.values()) {
+ if (i++ > 0) {
+ sb.append(", ");
+ }
+ sb.append(m.name().toLowerCase(Locale.US));
+ }
+ throw new IllegalArgumentException("mode must be one of: (" + sb +
+ "). I regret I do not understand: " + modeString);
+ }
+ }
private BasicContentHandlerFactory.HANDLER_TYPE type =
BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+
int writeLimit = -1;
int maxEmbeddedResources = -1;
+ PARSE_MODE parseMode = PARSE_MODE.RMETA;
+
- public HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE type, int writeLimit,
+ public HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE type, PARSE_MODE parseMode,
+ int writeLimit,
int maxEmbeddedResources) {
this.type = type;
+ this.parseMode = parseMode;
this.writeLimit = writeLimit;
this.maxEmbeddedResources = maxEmbeddedResources;
}
@@ -55,10 +97,8 @@ public class HandlerConfig implements Serializable {
return maxEmbeddedResources;
}
- @Override
- public String toString() {
- return "HandlerConfig{" + "type=" + type + ", writeLimit=" + writeLimit +
- ", maxEmbeddedResources=" + maxEmbeddedResources + '}';
+ public PARSE_MODE getParseMode() {
+ return parseMode;
}
@Override
@@ -71,11 +111,17 @@ public class HandlerConfig implements Serializable {
}
HandlerConfig that = (HandlerConfig) o;
return writeLimit == that.writeLimit && maxEmbeddedResources == that.maxEmbeddedResources &&
- type == that.type;
+ type == that.type && parseMode == that.parseMode;
}
@Override
public int hashCode() {
- return Objects.hash(type, writeLimit, maxEmbeddedResources);
+ return Objects.hash(type, writeLimit, maxEmbeddedResources, parseMode);
+ }
+
+ @Override
+ public String toString() {
+ return "HandlerConfig{" + "type=" + type + ", writeLimit=" + writeLimit +
+ ", maxEmbeddedResources=" + maxEmbeddedResources + ", mode=" + parseMode + '}';
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index 0e117dd..68b304f 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -28,15 +28,18 @@ import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.EncryptedDocumentException;
import org.apache.tika.exception.TikaException;
+import org.apache.tika.extractor.DocumentSelector;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.parser.AutoDetectParser;
@@ -51,6 +54,7 @@ import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.sax.BasicContentHandlerFactory;
+import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
import org.apache.tika.utils.ExceptionUtils;
import org.apache.tika.utils.StringUtils;
@@ -116,7 +120,8 @@ public class PipesServer implements Runnable {
private final long maxExtractSizeToReturn;
private final long serverParseTimeoutMillis;
private final long serverWaitTimeoutMillis;
- private Parser parser;
+ private Parser autoDetectParser;
+ private Parser rMetaParser;
private TikaConfig tikaConfig;
private FetcherManager fetcherManager;
private EmitterManager emitterManager;
@@ -307,7 +312,7 @@ public class PipesServer implements Runnable {
Metadata metadata = new Metadata();
try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) {
- metadataList = parseMetadata(t, stream, metadata);
+ metadataList = parse(t, stream, metadata);
} catch (SecurityException e) {
LOG.error("security exception " + t.getId(), e);
throw e;
@@ -378,17 +383,73 @@ public class PipesServer implements Runnable {
exit(1);
}
- private List<Metadata> parseMetadata(FetchEmitTuple fetchEmitTuple, InputStream stream,
- Metadata metadata) {
+ private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream,
+ Metadata metadata) {
HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig();
+ if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) {
+ return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata);
+ } else {
+ return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata);
+ }
+ }
+
+ private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple,
+ HandlerConfig handlerConfig, InputStream stream,
+ Metadata metadata) {
+ ContentHandlerFactory contentHandlerFactory =
+ new BasicContentHandlerFactory(handlerConfig.getType(), handlerConfig.getWriteLimit());
+ ContentHandler handler = contentHandlerFactory.getNewContentHandler();
+ ParseContext parseContext = new ParseContext();
+ parseContext.set(DocumentSelector.class, new DocumentSelector() {
+ final int maxEmbedded = handlerConfig.maxEmbeddedResources;
+ int embedded = 0;
+ @Override
+ public boolean select(Metadata metadata) {
+ if (maxEmbedded < 0) {
+ return true;
+ }
+ return embedded++ > maxEmbedded;
+ }
+ });
+
+ String containerException = null;
+ try {
+ autoDetectParser.parse(stream, handler, metadata, parseContext);
+ } catch (SAXException e) {
+ containerException = ExceptionUtils.getStackTrace(e);
+ LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
+ } catch (EncryptedDocumentException e) {
+ containerException = ExceptionUtils.getStackTrace(e);
+ LOG.warn("encrypted document:" + fetchEmitTuple.getId(), e);
+ } catch (SecurityException e) {
+ LOG.warn("security exception:" + fetchEmitTuple.getId(), e);
+ throw e;
+ } catch (Exception e) {
+ containerException = ExceptionUtils.getStackTrace(e);
+ LOG.warn("exception: " + fetchEmitTuple.getId(), e);
+ } finally {
+ metadata.add(TikaCoreProperties.TIKA_CONTENT, handler.toString());
+ if (containerException != null) {
+ metadata.add(TikaCoreProperties.CONTAINER_EXCEPTION, containerException);
+ }
+ try {
+ tikaConfig.getMetadataFilter().filter(metadata);
+ } catch (TikaException e) {
+ LOG.warn("exception mapping metadata", e);
+ }
+ }
+ return Collections.singletonList(metadata);
+ }
+
+ private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple,
+ HandlerConfig handlerConfig, InputStream stream,
+ Metadata metadata) {
RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler(
- new BasicContentHandlerFactory(handlerConfig.getType(),
- handlerConfig.getWriteLimit()),
- handlerConfig.getMaxEmbeddedResources(),
- tikaConfig.getMetadataFilter());
+ new BasicContentHandlerFactory(handlerConfig.getType(), handlerConfig.getWriteLimit()),
+ handlerConfig.getMaxEmbeddedResources(), tikaConfig.getMetadataFilter());
ParseContext parseContext = new ParseContext();
try {
- parser.parse(stream, handler, metadata, parseContext);
+ rMetaParser.parse(stream, handler, metadata, parseContext);
} catch (SAXException e) {
LOG.warn("sax problem:" + fetchEmitTuple.getId(), e);
} catch (EncryptedDocumentException e) {
@@ -443,16 +504,10 @@ public class PipesServer implements Runnable {
this.tikaConfig = new TikaConfig(tikaConfigPath);
this.fetcherManager = FetcherManager.load(tikaConfigPath);
this.emitterManager = EmitterManager.load(tikaConfigPath);
- Parser autoDetectParser = new AutoDetectParser(this.tikaConfig);
- this.parser = new RecursiveParserWrapper(autoDetectParser);
-
+ this.autoDetectParser = new AutoDetectParser(this.tikaConfig);
+ this.rMetaParser = new RecursiveParserWrapper(autoDetectParser);
}
- private static class FetchException extends IOException {
- FetchException(Throwable t) {
- super(t);
- }
- }
private void write(EmitData emitData, String stack) {
//TODO -- what do we do with the stack?
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 926f8eb..a717f62 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -48,13 +48,13 @@ public class AsyncProcessor implements Closeable {
static final int PARSER_FUTURE_CODE = 1;
- static final AtomicLong TOTAL_PROCESSED = new AtomicLong(0);
private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
private final ArrayBlockingQueue<EmitData> emitData;
private final ExecutorCompletionService<Integer> executorCompletionService;
private final ExecutorService executorService;
private final AsyncConfig asyncConfig;
+ private final AtomicLong totalProcessed = new AtomicLong(0);
private int numParserThreadsFinished = 0;
private boolean addedEmitterSemaphores = false;
private int finished = 0;
@@ -165,7 +165,7 @@ public class AsyncProcessor implements Closeable {
}
public long getTotalProcessed() {
- return TOTAL_PROCESSED.get();
+ return totalProcessed.get();
}
private class FetchEmitWorker implements Callable<Integer> {
@@ -203,7 +203,7 @@ public class AsyncProcessor implements Closeable {
//TODO -- add timeout, this currently hangs forever
emitDataQueue.offer(result.getEmitData());
}
- TOTAL_PROCESSED.incrementAndGet();
+ totalProcessed.incrementAndGet();
}
checkActive();
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
index dfb5bf4..4227274 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
@@ -70,6 +70,9 @@ public abstract class PipesIterator extends ConfigBase
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
private BasicContentHandlerFactory.HANDLER_TYPE handlerType =
BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
+
+ private HandlerConfig.PARSE_MODE parseMode = HandlerConfig.PARSE_MODE.RMETA;
+
private int writeLimit = -1;
private int maxEmbeddedResources = -1;
@@ -144,10 +147,19 @@ public abstract class PipesIterator extends ConfigBase
}
@Field
- void setMaxEmbeddedResources(int maxEmbeddedResources) {
+ public void setMaxEmbeddedResources(int maxEmbeddedResources) {
this.maxEmbeddedResources = maxEmbeddedResources;
}
+ @Field
+ public void setParseMode(String parseModeString) {
+ setParseMode(HandlerConfig.PARSE_MODE.parseMode(parseModeString));
+ }
+
+ public void setParseMode(HandlerConfig.PARSE_MODE parsePARSEMode) {
+ this.parseMode = parsePARSEMode;
+ }
+
public Integer call() throws Exception {
enqueue();
tryToAdd(COMPLETED_SEMAPHORE);
@@ -155,7 +167,7 @@ public abstract class PipesIterator extends ConfigBase
}
protected HandlerConfig getHandlerConfig() {
- return new HandlerConfig(handlerType, writeLimit, maxEmbeddedResources);
+ return new HandlerConfig(handlerType, parseMode, writeLimit, maxEmbeddedResources);
}
protected abstract void enqueue() throws IOException, TimeoutException, InterruptedException;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
index 7d1c0db..90cabe8 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
@@ -29,6 +29,7 @@ import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
@@ -66,7 +67,8 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
if (! line.startsWith("#") && !StringUtils.isBlank(line)) {
FetchKey fetchKey = new FetchKey(getFetcherName(), line);
EmitKey emitKey = new EmitKey(getEmitterName(), line);
- tryToAdd(new FetchEmitTuple(line, fetchKey, emitKey));
+ tryToAdd(new FetchEmitTuple(line, fetchKey, emitKey,
+ new Metadata(), getHandlerConfig(), getOnParseException()));
}
line = reader.readLine();
}
@@ -83,6 +85,7 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
public void setHasHeader(boolean hasHeader) {
this.hasHeader = hasHeader;
}
+
@Override
public void checkInitialization(InitializableProblemHandler problemHandler)
throws TikaConfigException {
@@ -97,6 +100,4 @@ public class FileListPipesIterator extends PipesIterator implements Initializabl
"Must specify an existing file");
}
}
-
-
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
index d1a1923..35f424a 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
@@ -104,7 +104,8 @@ public class FileSystemPipesIterator extends PipesIterator implements Initializa
try {
tryToAdd(new FetchEmitTuple(relPath, new FetchKey(fetcherName, relPath),
- new EmitKey(emitterName, relPath), new Metadata()));
+ new EmitKey(emitterName, relPath), new Metadata(), getHandlerConfig(),
+ getOnParseException()));
} catch (TimeoutException e) {
throw new IOException(e);
} catch (InterruptedException e) {
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java
index f6dfb43..17f2435 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/java/org/apache/tika/pipes/opensearch/tests/TikaPipesOpenSearchTest.java
@@ -17,6 +17,7 @@
package org.apache.tika.pipes.opensearch.tests;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -27,9 +28,10 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Matcher;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.io.IOUtils;
import org.jetbrains.annotations.NotNull;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -39,20 +41,20 @@ import org.testcontainers.utility.DockerImageName;
import org.apache.tika.cli.TikaCLI;
import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.opensearch.JsonResponse;
import org.apache.tika.pipes.emitter.opensearch.OpenSearchEmitter;
public class TikaPipesOpenSearchTest {
- private static final String COLLECTION = "testcol";
+ private static final String TEST_INDEX = "tika-pipes-index";
private static final File TEST_FILE_FOLDER = new File("target", "test-files");
- private final int numHtmlDocs = 42;
private int numTestDocs = 0;
protected GenericContainer<?> openSearch;
private String openSearchHost;
private int openSearchPort;
//this includes the collection, e.g. https://localhost:49213/testcol
- private String openSearchEndpoint;
+ private String openSearchEndpointBase;
private OpenSearchTestClient client;
@Rule
@@ -70,22 +72,164 @@ public class TikaPipesOpenSearchTest {
setupOpenSearch(openSearchContainer);
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
FileUtils.deleteDirectory(TEST_FILE_FOLDER);
}
@Test
- public void testFSToOpenSearch() throws Exception {
+ public void testBasicFSToOpenSearch() throws Exception {
+ int numHtmlDocs = 42;
+ createTestHtmlFiles("Happiness", numHtmlDocs);
+
+ String endpoint = openSearchEndpointBase + TEST_INDEX;
+ sendMappings(endpoint, TEST_INDEX, "opensearch-mappings.json");
+
+ runPipes(OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS,
+ HandlerConfig.PARSE_MODE.CONCATENATE, endpoint);
+
+ String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+ "\"query\": \"happiness\" } } } }";
+
+ JsonResponse results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(numHtmlDocs + numTestDocs,
+ results.getJson().get("hits").get("total").get("value").asInt());
+
+ //now try match all
+ query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} } }";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(numHtmlDocs + numTestDocs,
+ results.getJson().get("hits").get("total").get("value").asInt());
+ }
+
+ @Test
+ public void testParentChildFSToOpenSearch() throws Exception {
+ int numHtmlDocs = 42;
+ createTestHtmlFiles("Happiness", numHtmlDocs);
+ String endpoint = openSearchEndpointBase + TEST_INDEX;
+ sendMappings(endpoint, TEST_INDEX, "opensearch-parent-child-mappings.json");
+
+ runPipes(OpenSearchEmitter.AttachmentStrategy.PARENT_CHILD,
+ HandlerConfig.PARSE_MODE.RMETA, endpoint);
+
+ String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+ "\"query\": \"happiness\" } } } }";
+
+ JsonResponse results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(numHtmlDocs + numTestDocs,
+ results.getJson().get("hits").get("total").get("value").asInt());
+
+ //now try match all
+ query = "{ " +
+ //"\"from\":0, \"size\":1000," +
+ "\"track_total_hits\": true, \"query\": { " +
+ "\"match_all\": {} } }";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+ results.getJson().get("hits").get("total").get("value").asInt());
+
+ //now check out one of the embedded files
+ query = "{ \"track_total_hits\": true, \"query\": { \"query_string\": { " +
+ "\"default_field\": \"content\", " +
+ "\"query\": \"embed4 zip\" , \"minimum_should_match\":2 } } } ";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(1,
+ results.getJson().get("hits").get("total").get("value").asInt());
+ JsonNode source = results.getJson().get("hits").get("hits").get(0).get("_source");
+
+ assertEquals("test_recursive_embedded.docx-9",
+ results.getJson().get("hits").get("hits").get(0).get("_id").asText());
+ assertEquals("test_recursive_embedded.docx",
+ results.getJson().get("hits").get("hits").get(0).get("_routing").asText());
+ assertEquals("test_recursive_embedded.docx",
+ source.get("relation_type").get("parent").asText());
+ assertEquals("embedded",
+ source.get("relation_type").get("name").asText());
+
+ assertEquals("application/zip", source.get("mime").asText());
+
+ //now make sure all the children are returned by a parent search
+ query = "{ \"track_total_hits\": true, \"query\": { \"parent_id\": { " +
+ "\"type\": \"embedded\", " +
+ "\"id\": \"test_recursive_embedded.docx\" } } } ";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(11,
+ results.getJson().get("hits").get("total").get("value").asInt());
+ }
+
+
+ @Test
+ public void testSeparateDocsFSToOpenSearch() throws Exception {
+ int numHtmlDocs = 42;
+ createTestHtmlFiles("Happiness", numHtmlDocs);
+ String endpoint = openSearchEndpointBase + TEST_INDEX;
+ sendMappings(endpoint, TEST_INDEX, "opensearch-mappings.json");
+
+ runPipes(OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS,
+ HandlerConfig.PARSE_MODE.RMETA, endpoint);
+
+ String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
+ "\"query\": \"happiness\" } } } }";
+
+ JsonResponse results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(numHtmlDocs + numTestDocs,
+ results.getJson().get("hits").get("total").get("value").asInt());
+
+ //now try match all
+ query = "{ " +
+ //"\"from\":0, \"size\":1000," +
+ "\"track_total_hits\": true, \"query\": { " +
+ "\"match_all\": {} } }";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(numHtmlDocs + 12, //the .docx file has 11 embedded files, plus itself
+ results.getJson().get("hits").get("total").get("value").asInt());
+
+ //now check out one of the embedded files
+ query = "{ \"track_total_hits\": true, \"query\": { \"query_string\": { " +
+ "\"default_field\": \"content\", " +
+ "\"query\": \"embed4 zip\" , \"minimum_should_match\":2 } } } ";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(200, results.getStatus());
+ assertEquals(1,
+ results.getJson().get("hits").get("total").get("value").asInt());
+ JsonNode source = results.getJson().get("hits").get("hits").get(0).get("_source");
+
+ assertEquals("test_recursive_embedded.docx-9",
+ results.getJson().get("hits").get("hits").get(0).get("_id").asText());
+ assertNull("test_recursive_embedded.docx",
+ results.getJson().get("hits").get("hits").get(0).get("_routing"));
+ assertNull("test_recursive_embedded.docx",
+ source.get("relation_type"));
+
+ assertEquals("application/zip", source.get("mime").asText());
+
+ //now make sure there are no children; this query should
+ //cause an exception because there are no relationships in the schema
+ query = "{ \"track_total_hits\": true, \"query\": { \"parent_id\": { " +
+ "\"type\": \"embedded\", " +
+ "\"id\": \"test_recursive_embedded.docx\" } } } ";
+ results = client.postJson(endpoint + "/_search", query);
+ assertEquals(400, results.getStatus());
+ }
+
+
+ private void sendMappings(String endpoint, String index, String mappingsFile) throws Exception {
//create the collection with mappings
String mappings = IOUtils.toString(TikaPipesOpenSearchTest.class.getResourceAsStream(
- "/opensearch/opensearch-mappings.json"), StandardCharsets.UTF_8);
+ "/opensearch/" + mappingsFile), StandardCharsets.UTF_8);
int status = -1;
int tries = 0;
JsonResponse response = null;
//need to wait a bit sometimes before OpenSearch is up
while (status != 200 && tries++ < 20) {
- response = client.putJson(openSearchEndpoint, mappings);
+ response = client.putJson(endpoint, mappings);
if (status != 200) {
Thread.sleep(1000);
}
@@ -96,31 +240,12 @@ public class TikaPipesOpenSearchTest {
response);
}
assertTrue(response.getJson().get("acknowledged").asBoolean());
- assertEquals("testcol", response.getJson().get("index").asText());
-
- runPipes(OpenSearchEmitter.AttachmentStrategy.SKIP);
- //refresh to make sure the content is searchable
- JsonResponse refresh = client.getJson(openSearchEndpoint + "/_refresh");
-
- String query = "{ \"track_total_hits\": true, \"query\": { \"match\": { \"content\": { " +
- "\"query\": \"happiness\" } } } }";
-
- JsonResponse results = client.postJson(openSearchEndpoint + "/_search", query);
- assertEquals(200, results.getStatus());
- //assertEquals(numHtmlDocs + numTestDocs,
- // results.getJson().get("hits").get("total").get("value").asInt());
-
- //now try match all
- query = "{ \"track_total_hits\": true, \"query\": { \"match_all\": {} } }";
- results = client.postJson(openSearchEndpoint + "/_search", query);
- assertEquals(200, results.getStatus());
- assertEquals(numHtmlDocs + numTestDocs,
- results.getJson().get("hits").get("total").get("value").asInt());
-
+ assertEquals(index, response.getJson().get("index").asText());
}
- private void runPipes(OpenSearchEmitter.AttachmentStrategy attachmentStrategy) throws Exception {
+ private void runPipes(OpenSearchEmitter.AttachmentStrategy attachmentStrategy,
+ HandlerConfig.PARSE_MODE parseMode, String endpoint) throws Exception {
File tikaConfigFile = new File("target", "ta-opensearch.xml");
File log4jPropFile = new File("target", "tmp-log4j2.xml");
@@ -136,45 +261,48 @@ public class TikaPipesOpenSearchTest {
String tikaConfigXml =
createTikaConfigXml(tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
- attachmentStrategy);
+ attachmentStrategy, parseMode, endpoint);
FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
+ //refresh to make sure the content is searchable
+ JsonResponse refresh = client.getJson(endpoint + "/_refresh");
}
@NotNull
private String createTikaConfigXml(File tikaConfigFile, File log4jPropFile, String tikaConfigTemplateXml,
- OpenSearchEmitter.AttachmentStrategy attachmentStrategy) {
+ OpenSearchEmitter.AttachmentStrategy attachmentStrategy,
+ HandlerConfig.PARSE_MODE parseMode, String endpoint) {
String res =
tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
.replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
.replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
.replaceAll("\\{PATH_TO_DOCS\\}",
- Matcher.quoteReplacement(TEST_FILE_FOLDER.getAbsolutePath()));
+ Matcher.quoteReplacement(TEST_FILE_FOLDER.getAbsolutePath()))
+ .replace("{PARSE_MODE}", parseMode.name());
- res = res.replace("{OPENSEARCH_CONNECTION}", openSearchEndpoint);
+ res = res.replace("{OPENSEARCH_CONNECTION}", endpoint);
return res;
}
private void setupOpenSearch(GenericContainer<?> openSearchContainer) throws Exception {
- createTestHtmlFiles("Happiness");
this.openSearch = openSearchContainer;
openSearchHost = openSearch.getHost();
openSearchPort = openSearch.getMappedPort(9200);
- openSearchEndpoint = "https://" + openSearchHost + ":" + openSearchPort + "/" + COLLECTION;
+ openSearchEndpointBase = "https://" + openSearchHost + ":" + openSearchPort + "/";
HttpClientFactory httpClientFactory = new HttpClientFactory();
httpClientFactory.setUserName("admin");
httpClientFactory.setPassword("admin");
//attachment strategy is not used here...TODO clean this up
- client = new OpenSearchTestClient(openSearchEndpoint,
- httpClientFactory.build(), OpenSearchEmitter.AttachmentStrategy.SKIP);
+ client = new OpenSearchTestClient(openSearchEndpointBase,
+ httpClientFactory.build(), OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS);
}
- private void createTestHtmlFiles(String bodyContent) throws Exception {
+ private void createTestHtmlFiles(String bodyContent, int numHtmlDocs) throws Exception {
TEST_FILE_FOLDER.mkdirs();
for (int i = 0; i < numHtmlDocs; ++i) {
FileUtils.writeStringToFile(new File(TEST_FILE_FOLDER, "test-" + i + ".html"),
diff --git a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
index 42a2bc3..af0b53b 100644
--- a/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
+++ b/tika-integration-tests/tika-pipes-opensearch-integration-tests/src/test/resources/opensearch/tika-config-opensearch.xml
@@ -65,12 +65,11 @@
<emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
<emitWithinMillis>60000</emitWithinMillis>
<numEmitters>1</numEmitters>
- <numClients>1</numClients>
+ <numClients>3</numClients>
<tikaConfig>{TIKA_CONFIG}</tikaConfig>
<forkedJvmArgs>
- <arg>-Xmx1g</arg>
+ <arg>-Xmx512m</arg>
<arg>-XX:ParallelGCThreads=2</arg>
- <arg>-XX:+ExitOnOutOfMemoryError</arg>
<arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
</forkedJvmArgs>
<timeoutMillis>60000</timeoutMillis>
@@ -93,7 +92,6 @@
<updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
-->
<attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
- <contentField>content</contentField>
<commitWithin>10</commitWithin>
<idField>_id</idField>
<connectionTimeout>10000</connectionTimeout>
@@ -108,6 +106,7 @@
<basePath>{PATH_TO_DOCS}</basePath>
<fetcherName>fsf</fetcherName>
<emitterName>ose</emitterName>
+ <parseMode>{PARSE_MODE}</parseMode>
</params>
</pipesIterator>
</properties>
diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
index 5083ff0..3810b3d 100644
--- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
+++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/java/org/apache/tika/pipes/solr/tests/TikaPipesSolrTestBase.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.jetbrains.annotations.NotNull;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -35,6 +36,7 @@ import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
import org.testcontainers.utility.DockerImageName;
import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.solr.SolrEmitter;
public abstract class TikaPipesSolrTestBase {
@@ -62,6 +64,11 @@ public abstract class TikaPipesSolrTestBase {
setupSolr(solrContainer);
}
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.deleteDirectory(testFileFolder);
+ }
+
@Test
public void testFetchIteratorWithSolrUrls() throws Exception {
runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter();
@@ -120,7 +127,8 @@ public abstract class TikaPipesSolrTestBase {
String tikaConfigXml =
createTikaConfigXml(useZk(), tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
SolrEmitter.UpdateStrategy.ADD,
- SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+ SolrEmitter.AttachmentStrategy.PARENT_CHILD,
+ HandlerConfig.PARSE_MODE.CONCATENATE);
FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
@@ -142,7 +150,8 @@ public abstract class TikaPipesSolrTestBase {
tikaConfigXml =
createTikaConfigXml(useZk(), tikaConfigFile, log4jPropFile, tikaConfigTemplateXml,
SolrEmitter.UpdateStrategy.UPDATE_MUST_EXIST,
- SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+ SolrEmitter.AttachmentStrategy.PARENT_CHILD,
+ HandlerConfig.PARSE_MODE.CONCATENATE);
FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, StandardCharsets.UTF_8);
TikaCLI.main(new String[]{"-a", "--config=" + tikaConfigFile.getAbsolutePath()});
@@ -163,13 +172,15 @@ public abstract class TikaPipesSolrTestBase {
private String createTikaConfigXml(boolean useZk, File tikaConfigFile, File log4jPropFile,
String tikaConfigTemplateXml,
SolrEmitter.UpdateStrategy updateStrategy,
- SolrEmitter.AttachmentStrategy attachmentStrategy) {
+ SolrEmitter.AttachmentStrategy attachmentStrategy,
+ HandlerConfig.PARSE_MODE parseMode) {
String res =
tikaConfigTemplateXml.replace("{TIKA_CONFIG}", tikaConfigFile.getAbsolutePath())
.replace("{UPDATE_STRATEGY}", updateStrategy.toString())
.replace("{ATTACHMENT_STRATEGY}", attachmentStrategy.toString())
.replace("{LOG4J_PROPERTIES_FILE}", log4jPropFile.getAbsolutePath())
- .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath());
+ .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath())
+ .replace("{PARSE_MODE}", parseMode.name());
if (useZk) {
res = res.replace("{SOLR_CONNECTION}",
"<solrZkHosts>\n" + " <solrZkHost>" + solrHost + ":" + zkPort +
diff --git a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml
index 121185b..5f2740f 100644
--- a/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml
+++ b/tika-integration-tests/tika-pipes-solr-integration-tests/src/test/resources/tika-config-solr-urls.xml
@@ -92,7 +92,6 @@
<updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
<solrCollection>testcol</solrCollection>
<attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
- <contentField>content</contentField>
<commitWithin>10</commitWithin>
<idField>id</idField>
<connectionTimeout>10000</connectionTimeout>
@@ -106,7 +105,7 @@
</params>
</emitter>
</emitters>
- <pipesIterator class="org.apache.tika.pipes.solrtest.SolrPipesIterator">
+ <pipesIterator class="org.apache.tika.pipes.pipesiterator.solr.SolrPipesIterator">
<params>
<solrCollection>testcol</solrCollection>
{SOLR_CONNECTION}
@@ -114,6 +113,7 @@
<parsingIdField>parsing_id_i</parsingIdField>
<failCountField>fail_count_i</failCountField>
<sizeFieldName>size_i</sizeFieldName>
+ <parseMode>{PARSE_MODE}</parseMode>
<rows>10</rows>
<fetcherName>fsf</fetcherName>
<emitterName>se</emitterName>
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
index ab348ad..face044 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -113,6 +113,8 @@ public class OpenSearchClient {
jsonGenerator.writeStringField("parent", emitKey);
//end the relation type object
jsonGenerator.writeEndObject();
+ } else if (attachmentStrategy == OpenSearchEmitter.AttachmentStrategy.SEPARATE_DOCUMENTS) {
+ jsonGenerator.writeStringField("parent", emitKey);
}
//end the metadata object
jsonGenerator.writeEndObject();
diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
index 5f61814..208f138 100644
--- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
@@ -19,7 +19,6 @@ package org.apache.tika.pipes.emitter.opensearch;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -43,7 +42,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
public enum AttachmentStrategy {
- SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+ SEPARATE_DOCUMENTS, PARENT_CHILD,
//anything else?
}
@@ -51,7 +50,6 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
private AttachmentStrategy attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
private String openSearchUrl = null;
- private String contentField = "content";
private String idField = "_id";
private int commitWithin = 1000;
private OpenSearchClient openSearchClient;
@@ -69,144 +67,27 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
return;
}
try {
- if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
- metadataList = concatenate(metadataList);
- } else if (attachmentStrategy == AttachmentStrategy.SKIP) {
- metadataList = Collections.singletonList(metadataList.get(0));
- }
openSearchClient.addDocument(emitKey, metadataList);
} catch (TikaClientException e) {
throw new TikaEmitterException("failed to add document", e);
}
}
- private List<Metadata> concatenate(List<Metadata> metadataList) {
- if (metadataList.size() == 1) {
- return metadataList;
- }
-
- Metadata ret = metadataList.get(0);
- StringBuilder content = new StringBuilder();
- for (Metadata m : metadataList) {
- String c = m.get(getContentField());
- if (! StringUtils.isBlank(c)) {
- content.append(c).append("\n");
- }
- }
- ret.set(getContentField(), content.toString());
- return Collections.singletonList(ret);
-
- }
-/*
- private void addMetadataAsSolrInputDocuments(String emitKey, List<Metadata> metadataList,
- List<SolrInputDocument> docsToUpdate)
- throws IOException, TikaEmitterException {
- SolrInputDocument solrInputDocument = new SolrInputDocument();
- solrInputDocument.setField(idField, emitKey);
- if (updateStrategy == UpdateStrategy.UPDATE_MUST_EXIST) {
- solrInputDocument.setField("_version_", 1);
- } else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
- solrInputDocument.setField("_version_", -1);
- }
- if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
- addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
- } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
- //this only handles text for now, not xhtml
- StringBuilder sb = new StringBuilder();
- for (Metadata metadata : metadataList) {
- String content = metadata.get(getContentField());
- if (content != null) {
- sb.append(content).append("\n");
- }
- }
- Metadata parent = metadataList.get(0);
- parent.set(getContentField(), sb.toString());
- addMetadataToSolrInputDocument(parent, solrInputDocument, updateStrategy);
- } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
- addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
- for (int i = 1; i < metadataList.size(); i++) {
- SolrInputDocument childSolrInputDocument = new SolrInputDocument();
- Metadata m = metadataList.get(i);
- childSolrInputDocument.setField(idField, UUID.randomUUID().toString());
- addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
- }
- } else {
- throw new IllegalArgumentException(
- "I don't yet support this attachment strategy: " + attachmentStrategy);
- }
- docsToUpdate.add(solrInputDocument);
- }
-
- @Override
- public void emit(List<? extends EmitData> batch) throws IOException, TikaEmitterException {
- if (batch == null || batch.size() == 0) {
- LOG.warn("batch is null or empty");
- return;
- }
- List<SolrInputDocument> docsToUpdate = new ArrayList<>();
- for (EmitData d : batch) {
- addMetadataAsSolrInputDocuments(d.getEmitKey().getEmitKey(), d.getMetadataList(),
- docsToUpdate);
- }
- emitSolrBatch(docsToUpdate);
- }
-
- private void emitSolrBatch(List<SolrInputDocument> docsToUpdate)
- throws IOException, TikaEmitterException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Emitting solr doc batch: {}", docsToUpdate);
- }
- if (!docsToUpdate.isEmpty()) {
- try {
- UpdateRequest req = new UpdateRequest();
- req.add(docsToUpdate);
- req.setCommitWithin(commitWithin);
- req.setParam("failOnVersionConflicts", "false");
- req.process(openSearchClient, solrCollection);
- } catch (Exception e) {
- throw new TikaEmitterException("Could not add batch to solr", e);
- }
- }
- }
-
- private void addMetadataToSolrInputDocument(Metadata metadata,
- SolrInputDocument solrInputDocument,
- UpdateStrategy updateStrategy) {
- for (String n : metadata.names()) {
- String[] vals = metadata.getValues(n);
- if (vals.length == 0) {
- continue;
- } else if (vals.length == 1) {
- if (updateStrategy == UpdateStrategy.ADD) {
- solrInputDocument.setField(n, vals[0]);
- } else {
- solrInputDocument.setField(n, new HashMap<String, String>() {{
- put("set", vals[0]);
- }
- });
- }
- } else if (vals.length > 1) {
- if (updateStrategy == UpdateStrategy.ADD) {
- solrInputDocument.setField(n, vals);
- } else {
- solrInputDocument.setField(n, new HashMap<String, String[]>() {{
- put("set", vals);
- }
- });
- }
- }
- }
- }*/
/**
- * Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is "PARENT_CHILD".
- * If set to "SKIP", this will index only the main file and ignore all info
- * in the attachments. If set to "CONCATENATE_CONTENT", this will concatenate the
- * content extracted from the attachments into the main document and
- * then index the main document with the concatenated content _and_ the
- * main document's metadata (metadata from attachments will be thrown away).
- * If set to "PARENT_CHILD", this will index the attachments as children
- * of the parent document via OpenSearch's parent-child relationship.
+ * Options: SEPARATE_DOCUMENTS, PARENT_CHILD. Default is "SEPARATE_DOCUMENTS".
+ * All embedded documents are treated as independent documents.
+ * PARENT_CHILD requires a schema to be set up for the relationship type;
+ * all embedded objects (no matter how deeply nested) will have a single
+ * parent of the main container document.
+ *
+ * If you want to concatenate the content of embedded files and ignore
+ * the metadata of embedded files, set
+ * {@link org.apache.tika.pipes.HandlerConfig}'s parseMode to
+ * {@link org.apache.tika.pipes.HandlerConfig.PARSE_MODE#CONCATENATE}
+ * in your {@link org.apache.tika.pipes.FetchEmitTuple} or in the
+ * <parseMode> element in your {@link org.apache.tika.pipes.pipesiterator.PipesIterator}
+ * configuration.
*/
@Field
public void setAttachmentStrategy(String attachmentStrategy) {
@@ -224,24 +105,6 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable
httpClientFactory.setSocketTimeout(socketTimeout);
}
- public String getContentField() {
- return contentField;
- }
-
- /**
- * This is the field _after_ metadata mappings have been applied
- * that contains the "content" for each metadata object.
- * <p>
- * This is the field that is used if {@link #attachmentStrategy}
- * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
- *
- * @param contentField
- */
- @Field
- public void setContentField(String contentField) {
- this.contentField = contentField;
- }
-
public int getCommitWithin() {
return commitWithin;
}
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 534e899..a1388e0 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -91,20 +91,9 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
} else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
solrInputDocument.setField("_version_", -1);
}
- if (attachmentStrategy == AttachmentStrategy.SKIP || metadataList.size() == 1) {
+ if (metadataList.size() == 1) {
addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
- } else if (attachmentStrategy == AttachmentStrategy.CONCATENATE_CONTENT) {
- //this only handles text for now, not xhtml
- StringBuilder sb = new StringBuilder();
- for (Metadata metadata : metadataList) {
- String content = metadata.get(getContentField());
- if (content != null) {
- sb.append(content).append("\n");
- }
- }
- Metadata parent = metadataList.get(0);
- parent.set(getContentField(), sb.toString());
- addMetadataToSolrInputDocument(parent, solrInputDocument, updateStrategy);
+ docsToUpdate.add(solrInputDocument);
} else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
for (int i = 1; i < metadataList.size(); i++) {
@@ -112,12 +101,24 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
Metadata m = metadataList.get(i);
childSolrInputDocument.setField(idField, UUID.randomUUID().toString());
addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
+ solrInputDocument.addChildDocument(childSolrInputDocument);
+ }
+ docsToUpdate.add(solrInputDocument);
+ } else if (attachmentStrategy == AttachmentStrategy.SEPARATE_DOCUMENTS) {
+ addMetadataToSolrInputDocument(metadataList.get(0), solrInputDocument, updateStrategy);
+ docsToUpdate.add(solrInputDocument);
+ for (int i = 1; i < metadataList.size(); i++) {
+ SolrInputDocument childSolrInputDocument = new SolrInputDocument();
+ Metadata m = metadataList.get(i);
+ childSolrInputDocument.setField(idField,
+ solrInputDocument.get(idField) + "-" + UUID.randomUUID().toString());
+ addMetadataToSolrInputDocument(m, childSolrInputDocument, updateStrategy);
+ docsToUpdate.add(childSolrInputDocument);
}
} else {
throw new IllegalArgumentException(
"I don't yet support this attachment strategy: " + attachmentStrategy);
}
- docsToUpdate.add(solrInputDocument);
}
@Override
@@ -211,24 +212,6 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
this.socketTimeout = socketTimeout;
}
- public String getContentField() {
- return contentField;
- }
-
- /**
- * This is the field _after_ metadata mappings have been applied
- * that contains the "content" for each metadata object.
- * <p>
- * This is the field that is used if {@link #attachmentStrategy}
- * is {@link AttachmentStrategy#CONCATENATE_CONTENT}.
- *
- * @param contentField
- */
- @Field
- public void setContentField(String contentField) {
- this.contentField = contentField;
- }
-
public int getCommitWithin() {
return commitWithin;
}
@@ -326,7 +309,7 @@ public class SolrEmitter extends AbstractEmitter implements Initializable {
}
public enum AttachmentStrategy {
- SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
+ SEPARATE_DOCUMENTS, PARENT_CHILD,
//anything else?
}
diff --git a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
similarity index 99%
rename from tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
index 85b070c..4cc50d0 100644
--- a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.solrtest;
+package org.apache.tika.pipes.pipesiterator.solr;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
index 53a4f2d..cb23b34 100644
--- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
+++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java
@@ -48,6 +48,7 @@ public class JsonFetchEmitTuple {
private static final String HANDLER_CONFIG_TYPE = "type";
private static final String HANDLER_CONFIG_WRITE_LIMIT = "writeLimit";
private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources";
+ private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode";
public static FetchEmitTuple fromJson(Reader reader) throws IOException {
@@ -127,6 +128,7 @@ public class JsonFetchEmitTuple {
BasicContentHandlerFactory.HANDLER_TYPE.TEXT;
int writeLimit = -1;
int maxEmbeddedResources = -1;
+ HandlerConfig.PARSE_MODE parseMode = HandlerConfig.PARSE_MODE.RMETA;
String fieldName = jParser.nextFieldName();
while (fieldName != null) {
switch (fieldName) {
@@ -141,13 +143,17 @@ public class JsonFetchEmitTuple {
case HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES:
maxEmbeddedResources = jParser.nextIntValue(-1);
break;
+ case HANDLER_CONFIG_PARSE_MODE:
+ String modeString = jParser.nextTextValue();
+ parseMode = HandlerConfig.PARSE_MODE.parseMode(modeString);
+ break;
default:
throw new IllegalArgumentException("I regret I don't understand '" + fieldName +
"' in the context of a handler config");
}
fieldName = jParser.nextFieldName();
}
- return new HandlerConfig(handlerType, writeLimit, maxEmbeddedResources);
+ return new HandlerConfig(handlerType, parseMode, writeLimit, maxEmbeddedResources);
}
private static String getValue(JsonParser jParser) throws IOException {
@@ -189,6 +195,8 @@ public class JsonFetchEmitTuple {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField(HANDLER_CONFIG_TYPE,
t.getHandlerConfig().getType().name().toLowerCase(Locale.ROOT));
+ jsonGenerator.writeStringField(HANDLER_CONFIG_PARSE_MODE,
+ t.getHandlerConfig().getParseMode().name().toLowerCase(Locale.ROOT));
jsonGenerator.writeNumberField(HANDLER_CONFIG_WRITE_LIMIT,
t.getHandlerConfig().getWriteLimit());
jsonGenerator.writeNumberField(HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES,
diff --git a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
index c335bbc..56ad551 100644
--- a/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
+++ b/tika-serialization/src/test/java/org/apache/tika/metadata/serialization/JsonFetchEmitTupleTest.java
@@ -44,7 +44,9 @@ public class JsonFetchEmitTupleTest {
FetchEmitTuple t = new FetchEmitTuple("my_id", new FetchKey("my_fetcher", "fetchKey1"),
new EmitKey("my_emitter", "emitKey1"), m,
- new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, 10000, 10),
+ new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML,
+ HandlerConfig.PARSE_MODE.CONCATENATE,
+ 10000,10),
FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
index cd43f1b..c001f0b 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/RecursiveMetadataResource.java
@@ -122,11 +122,12 @@ public class RecursiveMetadataResource {
return Response
.ok(parseMetadataToMetadataList(att.getObject(InputStream.class), new Metadata(),
att.getHeaders(), info,
- buildHandlerConfig(att.getHeaders(), handlerTypeName))).build();
+ buildHandlerConfig(att.getHeaders(), handlerTypeName,
+ HandlerConfig.PARSE_MODE.RMETA))).build();
}
static HandlerConfig buildHandlerConfig(MultivaluedMap<String, String> httpHeaders,
- String handlerTypeName) {
+ String handlerTypeName, HandlerConfig.PARSE_MODE parseMode) {
int writeLimit = -1;
if (httpHeaders.containsKey("writeLimit")) {
writeLimit = Integer.parseInt(httpHeaders.getFirst("writeLimit"));
@@ -138,6 +139,7 @@ public class RecursiveMetadataResource {
}
return new HandlerConfig(
BasicContentHandlerFactory.parseHandlerType(handlerTypeName, DEFAULT_HANDLER_TYPE),
+ parseMode,
writeLimit, maxEmbeddedResources);
}
@@ -176,7 +178,8 @@ public class RecursiveMetadataResource {
return Response.ok(parseMetadataToMetadataList(
TikaResource.getInputStream(is, metadata, httpHeaders), metadata,
httpHeaders.getRequestHeaders(), info,
- buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName))).build();
+ buildHandlerConfig(httpHeaders.getRequestHeaders(), handlerTypeName,
+ HandlerConfig.PARSE_MODE.RMETA))).build();
}
private MetadataList parseMetadataToMetadataList(InputStream is, Metadata metadata,
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
index bc7e824..b63e29d 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaPipesTest.java
@@ -215,7 +215,8 @@ public class TikaPipesTest extends CXFTestBase {
new FetchKey("fsf", "hello_world.xml"),
new EmitKey("fse", ""),
userMetadata,
- new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML, -1, -1),
+ new HandlerConfig(BasicContentHandlerFactory.HANDLER_TYPE.XML,
+ HandlerConfig.PARSE_MODE.RMETA, -1, -1),
FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT);
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(t, writer);