You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 18:58:15 UTC

[tika] branch main updated: TIKA-3931 (#803)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new f846009d6 TIKA-3931 (#803)
f846009d6 is described below

commit f846009d6e9b4789f8057b1b27fde7caa8d4cbda
Author: Tim Allison <ta...@apache.org>
AuthorDate: Thu Nov 17 13:58:08 2022 -0500

    TIKA-3931 (#803)
    
    * TIKA-3931 -- add a JDBCPipesReporter
    
    * TIKA-3931 -- update bom
---
 CHANGES.txt                                        |   2 +
 tika-bom/pom.xml                                   |  30 ++-
 .../java/org/apache/tika/pipes/PipesReporter.java  |   4 +-
 .../org/apache/tika/pipes/PipesReporterBase.java   | 155 ++++++++++++
 tika-pipes/tika-pipes-reporters/pom.xml            |   1 +
 .../tika-pipes-reporter-jdbc/pom.xml               | 116 +++++++++
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 270 +++++++++++++++++++++
 .../reporters/jdbc/TestJDBCPipesReporter.java      | 238 ++++++++++++++++++
 .../src/test/resources/tika-config-excludes.xml    |  46 ++++
 .../src/test/resources/tika-config-includes.xml    |  46 ++++
 10 files changed, 901 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e9b58ee10..54d14214e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,7 @@
 Release 2.6.1 - ???
 
+   * Add a JDBCPipesReporter (TIKA-3931).
+
    * Add multivalued field strategy option in jdbc-emitter (TIKA-3930).
      Default is now 'concatenate' with ', ' as the delimiter.
 
diff --git a/tika-bom/pom.xml b/tika-bom/pom.xml
index e8fcc606e..151dd3138 100644
--- a/tika-bom/pom.xml
+++ b/tika-bom/pom.xml
@@ -319,6 +319,16 @@
         <artifactId>tika-emitter-gcs</artifactId>
         <version>2.6.1-SNAPSHOT</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.tika</groupId>
+        <artifactId>tika-emitter-jdbc</artifactId>
+        <version>2.6.1-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tika</groupId>
+        <artifactId>tika-emitter-kafka</artifactId>
+        <version>2.6.1-SNAPSHOT</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.tika</groupId>
         <artifactId>tika-emitter-opensearch</artifactId>
@@ -331,15 +341,24 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tika</groupId>
-        <artifactId>tika-emitter-kafka</artifactId>
+        <artifactId>tika-emitter-solr</artifactId>
         <version>2.6.1-SNAPSHOT</version>
       </dependency>
       <dependency>
         <groupId>org.apache.tika</groupId>
-        <artifactId>tika-emitter-solr</artifactId>
+        <artifactId>tika-pipes-reporter-fs-status</artifactId>
+        <version>2.6.1-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tika</groupId>
+        <artifactId>tika-pipes-reporter-jdbc</artifactId>
+        <version>2.6.1-SNAPSHOT</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tika</groupId>
+        <artifactId>tika-pipes-reporter-opensearch</artifactId>
         <version>2.6.1-SNAPSHOT</version>
       </dependency>
-
       <dependency>
         <groupId>org.apache.tika</groupId>
         <artifactId>tika-fetcher-gcs</artifactId>
@@ -355,7 +374,6 @@
         <artifactId>tika-fetcher-s3</artifactId>
         <version>2.6.1-SNAPSHOT</version>
       </dependency>
-
       <dependency>
         <groupId>org.apache.tika</groupId>
         <artifactId>tika-pipes-iterator-csv</artifactId>
@@ -373,12 +391,12 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tika</groupId>
-        <artifactId>tika-pipes-iterator-s3</artifactId>
+        <artifactId>tika-pipes-iterator-kafka</artifactId>
         <version>2.6.1-SNAPSHOT</version>
       </dependency>
       <dependency>
         <groupId>org.apache.tika</groupId>
-        <artifactId>tika-pipes-iterator-kafka</artifactId>
+        <artifactId>tika-pipes-iterator-s3</artifactId>
         <version>2.6.1-SNAPSHOT</version>
       </dependency>
       <dependency>
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
index 18db3fe1d..3978039b4 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java
@@ -92,5 +92,7 @@ public abstract class PipesReporter implements Closeable {
      * This is called if the process has crashed.
      * Implementers should not rely on close() to be called after this.
      * @param msg
-     */public abstract void error(String msg);
+     */
+    public abstract void error(String msg);
+
 }
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java
new file mode 100644
index 000000000..3dcddfa71
--- /dev/null
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+
+/**
+ * Base class that includes filtering by {@link PipesResult.STATUS}
+ */
+public abstract class PipesReporterBase extends PipesReporter implements Initializable {
+
+    private final Set<PipesResult.STATUS> includes = new HashSet<>();
+    private final Set<PipesResult.STATUS> excludes = new HashSet<>();
+
+    private StatusFilter statusFilter;
+
+    @Override
+    public void initialize(Map<String, Param> params) throws TikaConfigException {
+        statusFilter = buildStatusFilter(includes, excludes);
+    }
+
+    private StatusFilter buildStatusFilter(Set<PipesResult.STATUS> includes,
+                                           Set<PipesResult.STATUS> excludes) throws TikaConfigException {
+        if (includes.size() > 0 && excludes.size() > 0) {
+            throw new TikaConfigException("Only one of includes and excludes may have any " +
+                    "contents");
+        }
+        if (includes.size() > 0) {
+            return new IncludesFilter(includes);
+        } else if (excludes.size() > 0) {
+            return new ExcludesFilter(excludes);
+        }
+        return new AcceptAllFilter();
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+
+    }
+
+    /**
+     * Implementations must call this for the includes/excludes filters to work!
+     * @param status
+     * @return
+     */
+    public boolean accept(PipesResult.STATUS status) {
+        return statusFilter.accept(status);
+    }
+
+    @Field
+    public void setIncludes(List<String> includes) throws TikaConfigException {
+        for (String s : includes) {
+            try {
+                PipesResult.STATUS status = PipesResult.STATUS.valueOf(s);
+                this.includes.add(status);
+            } catch (IllegalArgumentException e) {
+                String optionString = getOptionString();
+                throw new TikaConfigException(
+                        "I regret I don't recognize " + s + ". I only understand: " + optionString,
+                        e);
+            }
+        }
+    }
+
+    @Field
+    public void setExcludes(List<String> excludes) throws TikaConfigException {
+        for (String s : excludes) {
+            try {
+                PipesResult.STATUS status = PipesResult.STATUS.valueOf(s);
+                this.excludes.add(status);
+            } catch (IllegalArgumentException e) {
+                String optionString = getOptionString();
+                throw new TikaConfigException(
+                        "I regret I don't recognize " + s + ". I only understand: " + optionString,
+                        e);
+            }
+        }
+    }
+
+    private String getOptionString() {
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        for (PipesResult.STATUS status : PipesResult.STATUS.values()) {
+            if (++i > 1) {
+                sb.append(", ");
+            }
+            sb.append(status.name());
+        }
+        return sb.toString();
+    }
+
+    private abstract static class StatusFilter {
+        abstract boolean accept(PipesResult.STATUS status);
+    }
+
+    private static class IncludesFilter extends StatusFilter {
+        private final Set<PipesResult.STATUS> includes;
+
+        private IncludesFilter(Set<PipesResult.STATUS> includes) {
+            this.includes = includes;
+        }
+
+        @Override
+        boolean accept(PipesResult.STATUS status) {
+            return includes.contains(status);
+        }
+    }
+
+    private static class ExcludesFilter extends StatusFilter {
+        private final Set<PipesResult.STATUS> excludes;
+
+        ExcludesFilter(Set<PipesResult.STATUS> excludes) {
+            this.excludes = excludes;
+        }
+
+        @Override
+        boolean accept(PipesResult.STATUS status) {
+            return !excludes.contains(status);
+        }
+    }
+
+    private static class AcceptAllFilter extends StatusFilter {
+
+        @Override
+        boolean accept(PipesResult.STATUS status) {
+            return true;
+        }
+    }
+
+
+}
diff --git a/tika-pipes/tika-pipes-reporters/pom.xml b/tika-pipes/tika-pipes-reporters/pom.xml
index d574a9d74..4f4c5e917 100644
--- a/tika-pipes/tika-pipes-reporters/pom.xml
+++ b/tika-pipes/tika-pipes-reporters/pom.xml
@@ -34,6 +34,7 @@
   <modules>
     <module>tika-pipes-reporter-opensearch</module>
     <module>tika-pipes-reporter-fs-status</module>
+    <module>tika-pipes-reporter-jdbc</module>
   </modules>
 
   <scm>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/pom.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/pom.xml
new file mode 100644
index 000000000..c2d990596
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes-reporters</artifactId>
+    <version>2.6.1-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tika-pipes-reporter-jdbc</artifactId>
+
+  <name>Apache Tika Pipes Reporter - JDBC Pipes Reporter</name>
+  <url>https://tika.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-core</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              <Automatic-Module-Name>org.apache.tika.pipes.reporters.jdbc</Automatic-Module-Name>
+            </manifestEntries>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>${maven.shade.version}</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>
+                false
+              </createDependencyReducedPom>
+              <!-- <filters> -->
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*</exclude>
+                    <exclude>LICENSE.txt</exclude>
+                    <exclude>NOTICE.txt</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <transformers>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/LICENSE</resource>
+                  <file>target/classes/META-INF/LICENSE</file>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/NOTICE</resource>
+                  <file>target/classes/META-INF/NOTICE</file>
+                </transformer>
+                <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                  <resource>META-INF/DEPENDENCIES</resource>
+                  <file>target/classes/META-INF/DEPENDENCIES</file>
+                </transformer>
+              </transformers>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <scm>
+    <tag>2.2.1-rc2</tag>
+  </scm>
+</project>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
new file mode 100644
index 000000000..f29c2f986
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.config.Param;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesReporterBase;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.utils.StringUtils;
+
+/**
+ * This is an initial draft of a JDBCPipesReporter.  This will drop
+ * the tika_status table with each run.  If you'd like different behavior,
+ * please open a ticket on our JIRA!
+ */
+public class JDBCPipesReporter extends PipesReporterBase implements Initializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCPipesReporter.class);
+    private static final int CACHE_SIZE = 100;
+    private static final int ARRAY_BLOCKING_QUEUE_SIZE = 1000;
+
+    public static final String TABLE_NAME = "tika_status";
+
+    private static final long MAX_WAIT_MILLIS = 120000;
+
+    private String connectionString;
+    private ArrayBlockingQueue<KeyStatusPair> queue =
+            new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE);
+    CompletableFuture<Void> reportWorkerFuture;
+
+    @Override
+    public void initialize(Map<String, Param> params) throws TikaConfigException {
+        super.initialize(params);
+        if (StringUtils.isBlank(connectionString)) {
+            throw new TikaConfigException("Must specify a connectionString");
+        }
+        ReportWorker reportWorker = new ReportWorker(connectionString, queue);
+        reportWorker.init();
+        reportWorkerFuture = CompletableFuture.runAsync(reportWorker);
+    }
+
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+
+    }
+
+    @Field
+    public void setConnection(String connection) {
+        this.connectionString = connection;
+    }
+
+
+    @Override
+    public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
+        if (! accept(result.getStatus())) {
+            return;
+        }
+        try {
+            queue.offer(new KeyStatusPair(t.getEmitKey().getEmitKey(), result.getStatus()),
+                    MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            //swallow
+        }
+
+    }
+
+    @Override
+    public void error(Throwable t) {
+        LOG.error("reported error; all bets are off", t);
+    }
+
+    @Override
+    public void error(String msg) {
+        LOG.error("reported error; all bets are off: {}", msg);
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            queue.offer(KeyStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            return;
+        }
+
+        try {
+            reportWorkerFuture.get(60, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            LOG.error("problem closing", e);
+            throw new RuntimeException(e);
+        } catch (TimeoutException e) {
+            LOG.error("timeout closing", e);
+        } catch (InterruptedException e) {
+            //
+        } finally {
+            reportWorkerFuture.cancel(true);
+        }
+    }
+
+    private static class KeyStatusPair {
+
+        static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null);
+        private final String emitKey;
+        private final PipesResult.STATUS status;
+
+        public KeyStatusPair(String emitKey, PipesResult.STATUS status) {
+            this.emitKey = emitKey;
+            this.status = status;
+        }
+
+        @Override
+        public String toString() {
+            return "KeyStatusPair{" + "emitKey='" + emitKey + '\'' + ", status=" + status + '}';
+        }
+    }
+
+    private static class ReportWorker implements Runnable {
+
+        private static final int MAX_TRIES = 3;
+        private final String connectionString;
+        private final ArrayBlockingQueue<KeyStatusPair> queue;
+        List<KeyStatusPair> cache = new ArrayList<>();
+        private Connection connection;
+        private PreparedStatement insert;
+
+        public ReportWorker(String connectionString, ArrayBlockingQueue<KeyStatusPair> queue) {
+            this.connectionString = connectionString;
+            this.queue = queue;
+        }
+
+        public void init() throws TikaConfigException {
+            try {
+                createConnection();
+                createTable();
+                createPreparedStatement();
+            } catch (SQLException e) {
+                throw new TikaConfigException("Problem creating connection, etc", e);
+            }
+        }
+
+        @Override
+        public void run() {
+            while (true) {
+                //blocking
+                KeyStatusPair p = null;
+                try {
+                    p = queue.take();
+                } catch (InterruptedException e) {
+                    return;
+                }
+                if (p == KeyStatusPair.END_SEMAPHORE) {
+                    try {
+                        reportNow();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                    return;
+                }
+                cache.add(p);
+                if (cache.size() >= CACHE_SIZE) {
+                    try {
+                        reportNow();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                }
+            }
+
+        }
+
+        private void reportNow() throws SQLException, InterruptedException {
+            int attempt = 0;
+            while (++attempt < MAX_TRIES) {
+                try {
+                    for (KeyStatusPair p : cache) {
+                        insert.clearParameters();
+                        insert.setString(1, p.emitKey);
+                        insert.setString(2, p.status.name());
+                        insert.addBatch();
+                    }
+                    insert.executeBatch();
+                    cache.clear();
+                    return;
+                } catch (SQLException e) {
+                    LOG.warn("problem writing to the db. Will try to reconnect", e);
+                    reconnect();
+                }
+            }
+        }
+
+        private void createTable() throws SQLException {
+            try (Statement st = connection.createStatement()) {
+                String sql = "drop table if exists " + TABLE_NAME;
+                st.execute(sql);
+                sql = "create table " + TABLE_NAME + " (emit_key varchar(512), status varchar(32))";
+                st.execute(sql);
+            }
+        }
+
+        private void reconnect() throws SQLException, InterruptedException {
+            int attempts = 0;
+            SQLException ex = null;
+            while (++attempts < 3) {
+                try {
+                    createConnection();
+                    createPreparedStatement();
+                    return;
+                } catch (SQLException e) {
+                    LOG.warn("problem reconnecting", e);
+                    //if there's a failure, wait 10 seconds
+                    //and hope the db is back up.
+                    Thread.sleep(10000);
+                    ex = e;
+                }
+            }
+            throw ex;
+        }
+
+        private void createConnection() throws SQLException {
+            connection = DriverManager.getConnection(connectionString);
+        }
+
+        private void createPreparedStatement() throws SQLException {
+            String sql = "insert into " + TABLE_NAME + " (emit_key, status) values (?,?)";
+            insert = connection.prepareStatement(sql);
+        }
+    }
+
+}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
new file mode 100644
index 000000000..188e262a9
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.reporters.jdbc;
+
+import static org.apache.tika.pipes.PipesResult.STATUS.PARSE_SUCCESS;
+import static org.apache.tika.pipes.PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.PipesReporter;
+import org.apache.tika.pipes.PipesResult;
+import org.apache.tika.pipes.async.AsyncConfig;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.pipesiterator.TotalCountResult;
+
+public class TestJDBCPipesReporter {
+
+    @Test
+    public void testBasic() throws Exception {
+        int numThreads = 10;
+        int numIterations = 200;
+        String connectionString = "jdbc:h2:mem:test_tika";
+        JDBCPipesReporter reporter = new JDBCPipesReporter();
+        reporter.setConnection(connectionString);
+        reporter.initialize(new HashMap<>());
+
+        Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
+        reporter.close();
+        Map<PipesResult.STATUS, Long> total = countReported(connectionString);
+        assertEquals(expected.size(), total.size());
+        long sum = 0;
+        for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) {
+            assertTrue(total.containsKey(e.getKey()), e.getKey().toString());
+            assertEquals(e.getValue(), total.get(e.getKey()), e.getKey().toString());
+            sum += e.getValue();
+        }
+        assertEquals(numThreads * numIterations, sum);
+    }
+
+    @Test
+    public void testIncludes() throws Exception {
+        Path p = Paths.get(this.getClass().getResource("/tika-config-includes.xml").toURI());
+        AsyncConfig asyncConfig = AsyncConfig.load(p);
+        PipesReporter reporter = asyncConfig.getPipesReporter();
+        int numThreads = 10;
+        int numIterations = 200;
+        String connectionString = "jdbc:h2:mem:test_tika";
+
+        Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
+        reporter.close();
+        Map<PipesResult.STATUS, Long> total = countReported(connectionString);
+        assertEquals(2, total.size());
+        long sum = 0;
+        for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) {
+            if (e.getKey() == PARSE_SUCCESS || e.getKey() == PARSE_SUCCESS_WITH_EXCEPTION) {
+                assertTrue(total.containsKey(e.getKey()), e.getKey().toString());
+                assertEquals(e.getValue(), total.get(e.getKey()), e.getKey().toString());
+            } else {
+                assertFalse(total.containsKey(e.getKey()), e.getKey().toString());
+            }
+            sum += e.getValue();
+        }
+        assertEquals(numThreads * numIterations, sum);
+    }
+
+    @Test
+    public void testExcludes() throws Exception {
+        Path p = Paths.get(this.getClass().getResource("/tika-config-excludes.xml").toURI());
+        AsyncConfig asyncConfig = AsyncConfig.load(p);
+        PipesReporter reporter = asyncConfig.getPipesReporter();
+        int numThreads = 10;
+        int numIterations = 200;
+        String connectionString = "jdbc:h2:mem:test_tika";
+
+        Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
+        reporter.close();
+        Map<PipesResult.STATUS, Long> total = countReported(connectionString);
+        assertEquals(15, total.size());
+        long sum = 0;
+        for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) {
+            if (e.getKey() != PARSE_SUCCESS && e.getKey() != PARSE_SUCCESS_WITH_EXCEPTION) {
+                assertTrue(total.containsKey(e.getKey()), e.getKey().toString());
+                assertEquals(e.getValue(), total.get(e.getKey()), e.getKey().toString());
+            } else {
+                assertFalse(total.containsKey(e.getKey()), e.getKey().toString());
+            }
+            sum += e.getValue();
+        }
+        assertEquals(numThreads * numIterations, sum);
+    }
+
+
+    private Map<PipesResult.STATUS, Long> countReported(String connectionString) throws
+            SQLException {
+        Map<PipesResult.STATUS, Long> counts = new HashMap<>();
+        try (Connection connection = DriverManager.getConnection(connectionString)) {
+            try (Statement st = connection.createStatement()) {
+                String sql = "select * from tika_status";
+                try (ResultSet rs = st.executeQuery(sql)) {
+                    while (rs.next()) {
+                        String fetchKey = rs.getString(1);
+                        String name = rs.getString(2);
+                        PipesResult.STATUS status = PipesResult.STATUS.valueOf(name);
+                        Long cnt = counts.get(status);
+                        if (cnt == null) {
+                            cnt = 1L;
+                        } else {
+                            cnt++;
+                        }
+                        counts.put(status, cnt);
+                    }
+                }
+            }
+        }
+        return counts;
+    }
+
+    private Map<PipesResult.STATUS, Long> runBatch(PipesReporter reporter,
+                                                   int numThreads,
+                                                   int numIterations)
+            throws ExecutionException, InterruptedException {
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        ExecutorCompletionService<Integer> executorCompletionService =
+                new ExecutorCompletionService(executorService);
+        List<ReportWorker> workerList = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            ReportWorker reportWorker = new ReportWorker(reporter, numIterations);
+            workerList.add(reportWorker);
+            executorCompletionService.submit(reportWorker);
+        }
+
+        Map<PipesResult.STATUS, Long> total = new HashMap<>();
+        int finished = 0;
+        while (finished < numThreads) {
+            Future<Integer> future = executorCompletionService.poll();
+            if (future != null) {
+                future.get();
+                finished++;
+            }
+        }
+        for (ReportWorker r : workerList) {
+            Map<PipesResult.STATUS, Long> local = r.getWritten();
+            for (Map.Entry<PipesResult.STATUS, Long> e : local.entrySet()) {
+                Long t = total.get(e.getKey());
+                if (t == null) {
+                    t = e.getValue();
+                } else {
+                    t += e.getValue();
+                }
+                total.put(e.getKey(), t);
+            }
+        }
+        return total;
+    }
+
+    private static class ReportWorker implements Callable<Integer> {
+        Map<PipesResult.STATUS, Long> written = new HashMap<>();
+        private static final AtomicInteger TOTAL_ADDED = new AtomicInteger(0);
+        private final PipesReporter reporter;
+        private final int numIterations;
+        private ReportWorker(PipesReporter reporter, int numIterations) {
+            this.reporter = reporter;
+            this.numIterations = numIterations;
+        }
+        @Override
+        public Integer call() throws Exception {
+            PipesResult.STATUS[] statuses = PipesResult.STATUS.values();
+            Random random = new Random();
+            for (int i = 0; i < numIterations; i++) {
+                PipesResult.STATUS status = statuses[random.nextInt(statuses.length)];
+                PipesResult pipesResult = new PipesResult(status);
+                String id = "id " + TOTAL_ADDED.getAndIncrement();
+                FetchEmitTuple t = new FetchEmitTuple(id,
+                        new FetchKey("fetcher", "fetchKey"),
+                        new EmitKey("emitter", id)
+                );
+
+                reporter.report(t, pipesResult, 100L);
+                Long cnt = written.get(status);
+                if (cnt == null) {
+                    written.put(status, 1L);
+                } else {
+                    cnt++;
+                    written.put(status, cnt);
+                }
+                if (i % 100 == 0) {
+                    Thread.sleep(94);
+                    reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)),
+                            TotalCountResult.STATUS.NOT_COMPLETED));
+                }
+            }
+            return 1;
+        }
+
+        Map<PipesResult.STATUS, Long> getWritten() {
+            return written;
+        }
+    }
+}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
new file mode 100644
index 000000000..f35b27952
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <async>
+  <params>
+    <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+    <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+    <emitWithinMillis>60000</emitWithinMillis>
+    <numEmitters>1</numEmitters>
+    <numClients>3</numClients>
+    <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+    <forkedJvmArgs>
+      <arg>-Xmx512m</arg>
+      <arg>-XX:ParallelGCThreads=2</arg>
+      <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+    </forkedJvmArgs>
+    <timeoutMillis>60000</timeoutMillis>
+  </params>
+  <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
+    <params>
+      <connection>jdbc:h2:mem:test_tika</connection>
+      <excludes>
+        <exclude>PARSE_SUCCESS</exclude>
+        <exclude>PARSE_SUCCESS_WITH_EXCEPTION</exclude>
+      </excludes>
+    </params>
+  </pipesReporter>
+</async>
+</properties>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
new file mode 100644
index 000000000..fa7c74fcf
--- /dev/null
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <async>
+  <params>
+    <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+    <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+    <emitWithinMillis>60000</emitWithinMillis>
+    <numEmitters>1</numEmitters>
+    <numClients>3</numClients>
+    <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+    <forkedJvmArgs>
+      <arg>-Xmx512m</arg>
+      <arg>-XX:ParallelGCThreads=2</arg>
+      <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+    </forkedJvmArgs>
+    <timeoutMillis>60000</timeoutMillis>
+  </params>
+  <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
+    <params>
+      <connection>jdbc:h2:mem:test_tika</connection>
+      <includes>
+        <include>PARSE_SUCCESS</include>
+        <include>PARSE_SUCCESS_WITH_EXCEPTION</include>
+      </includes>
+    </params>
+  </pipesReporter>
+</async>
+</properties>