You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2021/07/29 16:35:52 UTC

[tika] branch main updated: TIKA-3507 -- add PipesReporter

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 51b538b  TIKA-3507 -- add PipesReporter
51b538b is described below

commit 51b538bfacada18384e94bd68b7de6357106123d
Author: tallison <ta...@apache.org>
AuthorDate: Thu Jul 29 12:33:56 2021 -0400

    TIKA-3507 -- add PipesReporter
---
 .../java/org/apache/tika/config/ConfigBase.java    | 28 ++++++++++++-
 .../java/org/apache/tika/pipes/PipesReporter.java  | 29 ++++++++++++++
 .../org/apache/tika/pipes/async/AsyncConfig.java   | 11 ++++++
 .../apache/tika/pipes/async/AsyncProcessor.java    |  3 ++
 .../org/apache/tika/pipes/async/MockReporter.java  | 46 ++++++++++++++++++++++
 .../apache/tika/pipes/async/MockReporterTest.java  | 39 ++++++++++++++++++
 .../org/apache/tika/pipes/async/TIKA-3507.xml      | 34 ++++++++++++++++
 7 files changed, 189 insertions(+), 1 deletion(-)

diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index 20f110c..c68e509 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -198,7 +198,33 @@ public abstract class ConfigBase {
             Node child = children.item(i);
             if ("params".equals(child.getLocalName())) {
                 params = child.getChildNodes();
-                break;
+            } else if (child.getNodeType() == 1 && ! child.getLocalName().equals(exceptNodeName)) {
+                String itemName = child.getLocalName();
+                String setter = "set" + itemName.substring(0, 1).toUpperCase(Locale.US) +
+                        itemName.substring(1);
+                Class itemClass = null;
+                Method setterMethod = null;
+                for (Method method : object.getClass().getMethods()) {
+                    if (setter.equals(method.getName())) {
+                        Class<?>[] classes = method.getParameterTypes();
+                        if (classes.length == 1) {
+                            itemClass = classes[0];
+                            setterMethod = method;
+                            break;
+                        }
+                    }
+                }
+                if (itemClass == null) {
+                    throw new TikaConfigException("Couldn't find setter '" +
+                            setter + "' for " + itemName);
+                }
+                Object item = buildClass(child, itemName, itemClass);
+                setParams(itemClass.cast(item), child, new HashSet<>());
+                try {
+                    setterMethod.invoke(object, item);
+                } catch (IllegalAccessException | InvocationTargetException e) {
+                    throw new TikaConfigException("problem creating " + itemName, e);
+                }
             }
         }
         if (params != null) {
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
new file mode 100644
index 0000000..264a6f0
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+public abstract class PipesReporter {
+
+    public static PipesReporter NO_OP_REPORTER = new PipesReporter() {
+        @Override
+        public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+
+        }
+    };
+
+    public abstract void report(FetchEmitTuple t, PipesResult result, long elapsed);
+}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
index c2d5ed8..51b52af 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java
@@ -23,6 +23,7 @@ import java.nio.file.Path;
 
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.pipes.PipesConfigBase;
+import org.apache.tika.pipes.PipesReporter;
 
 public class AsyncConfig extends PipesConfigBase {
 
@@ -32,6 +33,8 @@ public class AsyncConfig extends PipesConfigBase {
     private int queueSize = 10000;
     private int numEmitters = 1;
 
+    private PipesReporter pipesReporter = PipesReporter.NO_OP_REPORTER;
+
     public static AsyncConfig load(Path p) throws IOException, TikaConfigException {
         AsyncConfig asyncConfig = new AsyncConfig();
         try (InputStream is = Files.newInputStream(p)) {
@@ -96,4 +99,12 @@ public class AsyncConfig extends PipesConfigBase {
     public int getNumEmitters() {
         return numEmitters;
     }
+
+    public PipesReporter getPipesReporter() {
+        return pipesReporter;
+    }
+
+    public void setPipesReporter(PipesReporter pipesReporter) {
+        this.pipesReporter = pipesReporter;
+    }
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index a717f62..62cc2d3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -194,6 +194,7 @@ public class AsyncProcessor implements Closeable {
                         return PARSER_FUTURE_CODE;
                     } else {
                         PipesResult result = null;
+                        long start = System.currentTimeMillis();
                         try {
                             result = pipesClient.process(t);
                         } catch (IOException e) {
@@ -203,6 +204,8 @@ public class AsyncProcessor implements Closeable {
                             //TODO -- add timeout, this currently hangs forever
                             emitDataQueue.offer(result.getEmitData());
                         }
+                        long elapsed = System.currentTimeMillis() - start;
+                        asyncConfig.getPipesReporter().report(t, result, elapsed);
                         totalProcessed.incrementAndGet();
                     }
                     checkActive();
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
new file mode 100644
index 0000000..112ace4
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.PipesResult;
+
+public class MockReporter extends PipesReporter {
+
+    private String endpoint;
+
+    @Override
+    public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+
+    }
+
+    @Field
+    public void setEndpoint(String endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    @Override
+    public String toString() {
+        return "MockReporter{" + "endpoint='" + endpoint + '\'' + '}';
+    }
+}
diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
new file mode 100644
index 0000000..599ac47
--- /dev/null
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporterTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.async;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.PipesReporter;
+
+public class MockReporterTest {
+
+    @Test
+    public void testBasic() throws Exception {
+        Path configPath = Paths.get(this.getClass().getResource("TIKA-3507.xml").toURI());
+        AsyncConfig asyncConfig = AsyncConfig.load(configPath);
+        PipesReporter reporter = asyncConfig.getPipesReporter();
+        assertTrue(reporter instanceof MockReporter);
+        assertEquals("somethingOrOther", ((MockReporter)reporter).getEndpoint());
+    }
+}
diff --git a/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml b/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
new file mode 100644
index 0000000..29e219a
--- /dev/null
+++ b/tika-core/src/test/resources/org/apache/tika/pipes/async/TIKA-3507.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <async>
+    <params>
+      <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+      <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+      <emitWithinMillis>60000</emitWithinMillis>
+      <numEmitters>1</numEmitters>
+    </params>
+    <pipesReporter class="org.apache.tika.pipes.async.MockReporter">
+      <params>
+        <endpoint>somethingOrOther</endpoint>
+      </params>
+    </pipesReporter>
+  </async>
+</properties>
\ No newline at end of file