You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/06/03 15:57:28 UTC
[tika] 02/02: TIKA-3435 -- allow fetchers only when
enableUnsecureFeatures=true
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 54e6b8c96baa32919165a3bff735633acf827ea8
Author: tballison <ta...@apache.org>
AuthorDate: Thu Jun 3 11:57:09 2021 -0400
TIKA-3435 -- allow fetchers only when enableUnsecureFeatures=true
---
.../apache/tika/server/core/TikaServerConfig.java | 83 +++++++++++++++++++++-
.../apache/tika/server/core/TikaServerProcess.java | 34 ++++-----
.../tika/server/core/resource/AsyncResource.java | 10 +--
.../tika/server/core/TikaServerConfigTest.java | 26 ++++++-
.../tika-config-server-fetchers-emitters.xml | 49 +++++++++++++
5 files changed, 171 insertions(+), 31 deletions(-)
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
index b8c9473..ead68bf 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerConfig.java
@@ -31,12 +31,16 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.cli.CommandLine;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
import org.apache.tika.config.ConfigBase;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.utils.ProcessUtils;
import org.apache.tika.utils.StringUtils;
+import org.apache.tika.utils.XMLReaderUtils;
public class TikaServerConfig extends ConfigBase {
@@ -97,6 +101,8 @@ public class TikaServerConfig extends ConfigBase {
private boolean returnStackTrace = false;
private boolean noFork = false;
private String tempFilePrefix = "apache-tika-server-forked-tmp-"; //can be set for debugging
+ private Set<String> supportedFetchers = new HashSet<>();
+ private Set<String> supportedEmitters = new HashSet<>();
private List<String> forkedJvmArgs = new ArrayList<>();
private String idBase = UUID.randomUUID().toString();
private String port = Integer.toString(DEFAULT_PORT);
@@ -126,7 +132,6 @@ public class TikaServerConfig extends ConfigBase {
Set<String> settings = new HashSet<>();
if (commandLine.hasOption("c")) {
config = load(Paths.get(commandLine.getOptionValue("c")), commandLine, settings);
- config.setConfigPath(commandLine.getOptionValue("c"));
} else {
config = new TikaServerConfig();
}
@@ -169,11 +174,17 @@ public class TikaServerConfig extends ConfigBase {
static TikaServerConfig load(Path p, CommandLine commandLine, Set<String> settings) throws IOException,
TikaException {
try (InputStream is = Files.newInputStream(p)) {
- return TikaServerConfig.load(is, commandLine, settings);
+ TikaServerConfig config = TikaServerConfig.load(is, commandLine, settings);
+ if (config.getConfigPath() == null) {
+ config.setConfigPath(p.toAbsolutePath().toString());
+ }
+ loadSupportedFetchersEmitters(config);
+ return config;
}
}
- static TikaServerConfig load(InputStream is, CommandLine commandLine, Set<String> settings)
+ private static TikaServerConfig load(InputStream is, CommandLine commandLine,
+ Set<String> settings)
throws IOException, TikaException {
TikaServerConfig tikaServerConfig = new TikaServerConfig();
Set<String> configSettings = tikaServerConfig.configure("server", is);
@@ -185,6 +196,65 @@ public class TikaServerConfig extends ConfigBase {
return tikaServerConfig;
}
+ private static void loadSupportedFetchersEmitters(TikaServerConfig tikaServerConfig)
+ throws IOException, TikaConfigException {
+ //this is an abomination... clean up this double read
+ try (InputStream is = Files.newInputStream(tikaServerConfig.getConfigPath())) {
+ Node properties = null;
+ try {
+ properties = XMLReaderUtils.buildDOM(is).getDocumentElement();
+ } catch (SAXException e) {
+ throw new IOException(e);
+ } catch (TikaException e) {
+ throw new TikaConfigException("problem loading xml to dom", e);
+ }
+ if (!properties.getLocalName().equals("properties")) {
+ throw new TikaConfigException("expect properties as root node");
+ }
+ NodeList children = properties.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++) {
+ Node child = children.item(i);
+ if ("fetchers".equals(child.getLocalName())) {
+ loadSupported(child, "fetcher", tikaServerConfig.supportedFetchers);
+ } else if ("emitters".equals(child.getLocalName())) {
+ loadSupported(child, "emitter", tikaServerConfig.supportedEmitters);
+ }
+ }
+ }
+ }
+
+ private static void loadSupported(Node compound,
+ String itemName,
+ Set<String> supported) {
+ NodeList children = compound.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++) {
+ Node child = children.item(i);
+ if (itemName.equals(child.getLocalName())) {
+ String name = getName(child);
+ if (name != null) {
+ supported.add(name);
+ }
+ }
+ }
+ }
+
+ private static String getName(Node fetcherOrEmitter) {
+ NodeList children = fetcherOrEmitter.getChildNodes();
+ for (int i = 0; i < children.getLength(); i++) {
+ Node child = children.item(i);
+ if ("params".equals(child.getLocalName())) {
+ NodeList params = child.getChildNodes();
+ for (int j = 0; j < params.getLength(); j++) {
+ Node param = params.item(j);
+ if ("name".equals(param.getLocalName())) {
+ return param.getTextContent();
+ }
+ }
+ }
+ }
+ return null;
+ }
+
public boolean isNoFork() {
return noFork;
}
@@ -507,4 +577,11 @@ public class TikaServerConfig extends ConfigBase {
return indivPorts.stream().mapToInt(Integer::intValue).toArray();
}
+ public Set<String> getSupportedFetchers() {
+ return supportedFetchers;
+ }
+
+ public Set<String> getSupportedEmitters() {
+ return supportedEmitters;
+ }
}
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
index 82207c4..c01c046 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/TikaServerProcess.java
@@ -112,29 +112,14 @@ public class TikaServerProcess {
public static void main(String[] args) throws Exception {
LOG.info("Starting {} server", new Tika());
- AsyncResource asyncResource = null;
try {
Options options = getOptions();
CommandLineParser cliParser = new DefaultParser();
CommandLine line = cliParser.parse(options, args);
TikaServerConfig tikaServerConfig = TikaServerConfig.load(line);
LOG.debug("forked config: {}", tikaServerConfig);
- if (tikaServerConfig.isEnableUnsecureFeatures() &&
- tikaServerConfig.getEndpoints().contains("async")) {
- final AsyncResource localAsyncResource =
- new AsyncResource(tikaServerConfig.getConfigPath());
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- localAsyncResource.shutdownNow();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }));
- asyncResource = localAsyncResource;
- }
-
- ServerDetails serverDetails = initServer(tikaServerConfig, asyncResource);
+ ServerDetails serverDetails = initServer(tikaServerConfig);
startServer(serverDetails);
} catch (Exception e) {
@@ -156,8 +141,7 @@ public class TikaServerProcess {
}
//This returns the server, configured and ready to be started.
- private static ServerDetails initServer(TikaServerConfig tikaServerConfig,
- AsyncResource asyncResource) throws Exception {
+ private static ServerDetails initServer(TikaServerConfig tikaServerConfig) throws Exception {
String host = tikaServerConfig.getHost();
int[] ports = tikaServerConfig.getPorts();
if (ports.length > 1) {
@@ -196,7 +180,6 @@ public class TikaServerProcess {
//TODO -- clean this up -- only load as necessary
FetcherManager fetcherManager = null;
- EmitterManager emitterManager = null;
InputStreamFactory inputStreamFactory = null;
if (tikaServerConfig.isEnableUnsecureFeatures()) {
fetcherManager = FetcherManager.load(tikaServerConfig.getConfigPath());
@@ -330,8 +313,14 @@ public class TikaServerProcess {
resourceProviders.add(new SingletonResourceProvider(new TikaParsers()));
resourceProviders.add(new SingletonResourceProvider(new TikaVersion()));
if (tikaServerConfig.isEnableUnsecureFeatures()) {
- addAsyncResource = true;
- addPipesResource = true;
+ //check to make sure there are both fetchers and emitters
+ //specified. It is possible that users may only specify fetchers
+ //for legacy endpoints.
+ if (tikaServerConfig.getSupportedFetchers().size() > 0 &&
+ tikaServerConfig.getSupportedEmitters().size() > 0) {
+ addAsyncResource = true;
+ addPipesResource = true;
+ }
resourceProviders
.add(new SingletonResourceProvider(new TikaServerStatus(serverStatus)));
}
@@ -370,7 +359,8 @@ public class TikaServerProcess {
}
if (addAsyncResource) {
- final AsyncResource localAsyncResource = new AsyncResource(tikaServerConfig.getConfigPath());
+ final AsyncResource localAsyncResource = new AsyncResource(
+ tikaServerConfig.getConfigPath(), tikaServerConfig.getSupportedFetchers());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
localAsyncResource.shutdownNow();
diff --git a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index 1ee5600..172c8c6 100644
--- a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -25,6 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.POST;
@@ -48,7 +49,6 @@ import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetcher.FetcherManager;
@Path("/async")
public class AsyncResource {
@@ -57,14 +57,14 @@ public class AsyncResource {
long maxQueuePauseMs = 60000;
private final AsyncProcessor asyncProcessor;
- private final FetcherManager fetcherManager;
+ private final Set<String> supportedFetchers;
private final EmitterManager emitterManager;
private ArrayBlockingQueue<FetchEmitTuple> queue;
- public AsyncResource(java.nio.file.Path tikaConfigPath)
+ public AsyncResource(java.nio.file.Path tikaConfigPath, Set<String> supportedFetchers)
throws TikaException, IOException, SAXException {
this.asyncProcessor = new AsyncProcessor(tikaConfigPath);
- this.fetcherManager = FetcherManager.load(tikaConfigPath);
+ this.supportedFetchers = supportedFetchers;
this.emitterManager = EmitterManager.load(tikaConfigPath);
}
@@ -104,7 +104,7 @@ public class AsyncResource {
//the requested fetchers and emitters
//throw early
for (FetchEmitTuple t : request.getTuples()) {
- if (!fetcherManager.getSupported().contains(t.getFetchKey().getFetcherName())) {
+ if (!supportedFetchers.contains(t.getFetchKey().getFetcherName())) {
return badFetcher(t.getFetchKey());
}
if (!emitterManager.getSupported().contains(t.getEmitKey().getEmitterName())) {
diff --git a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
index a67788d..b7e8f29 100644
--- a/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
+++ b/tika-server/tika-server-core/src/test/java/org/apache/tika/server/core/TikaServerConfigTest.java
@@ -19,6 +19,8 @@ package org.apache.tika.server.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
@@ -37,8 +39,10 @@ public class TikaServerConfigTest {
Set<String> settings = new HashSet<>();
CommandLineParser parser = new DefaultParser();
CommandLine emptyCommandLine = parser.parse(new Options(), new String[]{});
+ Path path = Paths.get(TikaConfigTest.class.getResource(
+ "/configs/tika-config-server.xml").toURI());
TikaServerConfig config = TikaServerConfig
- .load(TikaConfigTest.class.getResourceAsStream("/configs/tika-config-server.xml"),
+ .load(path,
emptyCommandLine,
settings);
assertEquals(-1, config.getMaxRestarts());
@@ -48,4 +52,24 @@ public class TikaServerConfigTest {
assertTrue(settings.contains("taskTimeoutMillis"));
assertTrue(settings.contains("enableUnsecureFeatures"));
}
+
+ @Test
+ public void testSupportedFetchersEmitters() throws Exception {
+ Set<String> settings = new HashSet<>();
+ CommandLineParser parser = new DefaultParser();
+ CommandLine emptyCommandLine = parser.parse(new Options(), new String[]{});
+ Path path = Paths.get(TikaConfigTest.class.getResource(
+ "/configs/tika-config-server-fetchers-emitters.xml").toURI());
+ TikaServerConfig config = TikaServerConfig
+ .load(path,
+ emptyCommandLine,
+ settings);
+ assertEquals(-1, config.getMaxRestarts());
+ assertEquals(54321, config.getTaskTimeoutMillis());
+ assertEquals(true, config.isEnableUnsecureFeatures());
+ assertEquals(1, config.getSupportedFetchers().size());
+ assertEquals(1, config.getSupportedEmitters().size());
+ assertTrue(config.getSupportedFetchers().contains("fsf"));
+ assertTrue(config.getSupportedEmitters().contains("fse"));
+ }
}
diff --git a/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml b/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml
new file mode 100644
index 0000000..360c9e1
--- /dev/null
+++ b/tika-server/tika-server-core/src/test/resources/configs/tika-config-server-fetchers-emitters.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+ <params>
+ <name>fsf</name>
+ <basePath>/somePathOrOther</basePath>
+ </params>
+ </fetcher>
+ </fetchers>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+ <params>
+ <name>fse</name>
+ <basePath>/path/or/other/extracts</basePath>
+ </params>
+ </emitter>
+ </emitters>
+ <server>
+ <params>
+ <port>9999</port>
+ <taskTimeoutMillis>54321</taskTimeoutMillis>
+ <enableUnsecureFeatures>true</enableUnsecureFeatures>
+ <maxFiles>20</maxFiles>
+ <forkedJvmArgs>
+ <arg>-Xmx2g</arg>
+ </forkedJvmArgs>
+ <endpoints>
+ <endpoint>rmeta</endpoint>
+ </endpoints>
+ </params>
+ </server>
+</properties>