You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/29 16:35:52 UTC
[tika] branch main updated: TIKA-3507 -- add PipesReporter
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 51b538b TIKA-3507 -- add PipesReporter
51b538b is described below
commit 51b538bfacada18384e94bd68b7de6357106123d
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jul 29 12:33:56 2021 -0400
TIKA-3507 -- add PipesReporter
---
.../java/org/apache/tika/config/ConfigBase.java | 28 ++++++++++++-
.../java/org/apache/tika/pipes/PipesReporter.java | 29 ++++++++++++++
.../org/apache/tika/pipes/async/AsyncConfig.java | 11 ++++++
.../apache/tika/pipes/async/AsyncProcessor.java | 3 ++
.../org/apache/tika/pipes/async/MockReporter.java | 46 ++++++++++++++++++++++
.../apache/tika/pipes/async/MockReporterTest.java | 39 ++++++++++++++++++
.../org/apache/tika/pipes/async/TIKA-3507.xml | 34 ++++++++++++++++
7 files changed, 189 insertions(+), 1 deletion(-)
diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index 20f110c..c68e509 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -198,7 +198,33 @@ public abstract class ConfigBase {
Node child = children.item(i);
if ("params".equals(child.getLocalName())) {
params = child.getChildNodes();
- break;
+ } else if (child.getNodeType() == 1 && ! child.getLocalName().equals(exceptNodeName)) {
+ String itemName = child.getLocalName();
+ String setter = "set" + itemName.substring(0, 1).toUpperCase(Locale.US) +
+ itemName.substring(1);
+ Class itemClass = null;
+ Method setterMethod = null;
+ for (Method method : object.getClass().getMethods()) {
+ if (setter.equals(method.getName())) {
+ Class<?>[] classes = method.getParameterTypes();
+ if (classes.length == 1) {
+ itemClass = classes[0];
+ setterMethod = method;
+ break;
+ }
+ }
+ }
+ if (itemClass == null) {
+ throw new TikaConfigException("Couldn't find setter '" +
+ setter + "' for " + itemName);
+ }
+ Object item = buildClass(child, itemName, itemClass);
+ setParams(itemClass.cast(item), child, new HashSet<>());
+ try {
+ setterMethod.invoke(object, item);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new TikaConfigException("problem creating " + itemName, e);
+ }
}
}
if (params != null) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
new file mode 100644
index 0000000..264a6f0
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+public abstract class PipesReporter {
+
+ public static PipesReporter NO_OP_REPORTER = new PipesReporter() {
+ @Override
+ public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+
+ }
+ };
+
+ public abstract void report(FetchEmitTuple t, PipesResult result, long elapsed);
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index c2d5ed8..51b52af 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -23,6 +23,7 @@ import java.nio.file.Path;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.pipes.PipesConfigBase;
+import org.apache.tika.pipes.PipesReporter;
public class AsyncConfig extends PipesConfigBase {
@@ -32,6 +33,8 @@ public class AsyncConfig extends PipesConfigBase {
private int queueSize = 10000;
private int numEmitters = 1;
+ private PipesReporter pipesReporter = PipesReporter.NO_OP_REPORTER;
+
public static AsyncConfig load(Path p) throws IOException, TikaConfigException {
AsyncConfig asyncConfig = new AsyncConfig();
try (InputStream is = Files.newInputStream(p)) {
@@ -96,4 +99,12 @@ public class AsyncConfig extends PipesConfigBase {
public int getNumEmitters() {
return numEmitters;
}
+
+ public PipesReporter getPipesReporter() {
+ return pipesReporter;
+ }
+
+ public void setPipesReporter(PipesReporter pipesReporter) {
+ this.pipesReporter = pipesReporter;
+ }
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index a717f62..62cc2d3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -194,6 +194,7 @@ public class AsyncProcessor implements Closeable {
return PARSER_FUTURE_CODE;
} else {
PipesResult result = null;
+ long start = System.currentTimeMillis();
try {
result = pipesClient.process(t);
} catch (IOException e) {
@@ -203,6 +204,8 @@ public class AsyncProcessor implements Closeable {
//TODO -- add timeout, this currently hangs forever
emitDataQueue.offer(result.getEmitData());
}
+ long elapsed = System.currentTimeMillis() - start;
+ asyncConfig.getPipesReporter().report(t, result, elapsed);
totalProcessed.incrementAndGet();
}
checkActive();
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
new file mode 100644
index 0000000..112ace4
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.PipesResult;
+
+public class MockReporter extends PipesReporter {
+
+ private String endpoint;
+
+ @Override
+ public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+
+ }
+
+ @Field
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ public String toString() {
+ return "MockReporter{" + "endpoint='" + endpoint + '\'' + '}';
+ }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
new file mode 100644
index 0000000..599ac47
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.PipesReporter;
+
+public class MockReporterTest {
+
+ @Test
+ public void testBasic() throws Exception {
+ Path configPath = Paths.get(this.getClass().getResource("TIKA-3507.xml").toURI());
+ AsyncConfig asyncConfig = AsyncConfig.load(configPath);
+ PipesReporter reporter = asyncConfig.getPipesReporter();
+ assertTrue(reporter instanceof MockReporter);
+ assertEquals("somethingOrOther", ((MockReporter)reporter).getEndpoint());
+ }
+}
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml b/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
new file mode 100644
index 0000000..29e219a
--- /dev/null
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<properties>
+ <async>
+ <params>
+ <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+ <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+ <emitWithinMillis>60000</emitWithinMillis>
+ <numEmitters>1</numEmitters>
+ </params>
+ <pipesReporter class="org.apache.tika.pipes.async.MockReporter">
+ <params>
+ <endpoint>somethingOrOther</endpoint>
+ </params>
+ </pipesReporter>
+ </async>
+</properties>
\ No newline at end of file