You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 21:36:37 UTC

[tika] branch main updated (bc158d596 -> 4b114a31a)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


    from bc158d596 TIKA-3932 -- temporary workaround :(
     new 2528d468a CompositePipesReporter needs to close resources
     new 4b114a31a Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/tika/pipes/CompositePipesReporter.java  | 22 ++++++++++
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 19 +++++++--
 .../reporters/jdbc/TestJDBCPipesReporter.java      | 47 +++++++++++++++++-----
 .../{ => configs}/tika-config-excludes.xml         |  2 +-
 .../{ => configs}/tika-config-includes.xml         |  2 +-
 5 files changed, 75 insertions(+), 17 deletions(-)
 rename tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/{ => configs}/tika-config-excludes.xml (96%)
 rename tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/{ => configs}/tika-config-includes.xml (96%)


[tika] 01/02: CompositePipesReporter needs to close resources

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 2528d468a73f29293ab069e7feef181ff53ea9d3
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 16:32:04 2022 -0500

    CompositePipesReporter needs to close resources
---
 .../apache/tika/pipes/CompositePipesReporter.java  | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
index 4f78b6be8..dfce28bb7 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.tika.pipes;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -76,4 +77,25 @@ public class CompositePipesReporter extends PipesReporter implements Initializab
             throw new TikaConfigException("must specify at least one pipes reporter");
         }
     }
+
+    /**
+     * Tries to close all resources.  Throws the last encountered IOException
+     * if any are thrown by the component reporters.
+     *
+     * @throws IOException
+     */
+    @Override
+    public void close() throws IOException {
+        IOException ex = null;
+        for (PipesReporter pipesReporter : pipesReporters) {
+            try {
+                pipesReporter.close();
+            } catch (IOException e) {
+                ex = e;
+            }
+        }
+        if (ex != null) {
+            throw ex;
+        }
+    }
 }


[tika] 02/02: Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 4b114a31aee19ce921be4151bf75e4212596754b
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 16:36:27 2022 -0500

    Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter
---
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 19 +++++++--
 .../reporters/jdbc/TestJDBCPipesReporter.java      | 47 +++++++++++++++++-----
 .../{ => configs}/tika-config-excludes.xml         |  2 +-
 .../{ => configs}/tika-config-includes.xml         |  2 +-
 4 files changed, 53 insertions(+), 17 deletions(-)

diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index f29c2f986..835b50ec1 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -60,8 +60,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
     private static final long MAX_WAIT_MILLIS = 120000;
 
     private String connectionString;
-    private ArrayBlockingQueue<KeyStatusPair> queue =
-            new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE);
+    private final ArrayBlockingQueue<KeyStatusPair> queue =
+            new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
     CompletableFuture<Void> reportWorkerFuture;
 
     @Override
@@ -186,6 +186,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                     return;
                 }
                 if (p == KeyStatusPair.END_SEMAPHORE) {
+                    LOG.trace("received end semaphore");
                     try {
                         reportNow();
                     } catch (SQLException e) {
@@ -193,6 +194,15 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                     } catch (InterruptedException e) {
                         return;
                     }
+                    LOG.trace("about to close");
+                    try {
+                        insert.close();
+                        connection.close();
+                        LOG.trace("successfully closed resources");
+                    } catch (SQLException e) {
+                        LOG.warn("problem shutting down reporter", e);
+                    }
+
                     return;
                 }
                 cache.add(p);
@@ -220,6 +230,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                         insert.addBatch();
                     }
                     insert.executeBatch();
+                    LOG.debug("writing {} " + cache.size());
                     cache.clear();
                     return;
                 } catch (SQLException e) {
@@ -233,7 +244,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
             try (Statement st = connection.createStatement()) {
                 String sql = "drop table if exists " + TABLE_NAME;
                 st.execute(sql);
-                sql = "create table " + TABLE_NAME + " (emit_key varchar(512), status varchar(32))";
+                sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))";
                 st.execute(sql);
             }
         }
@@ -262,7 +273,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         }
 
         private void createPreparedStatement() throws SQLException {
-            String sql = "insert into " + TABLE_NAME + " (emit_key, status) values (?,?)";
+            String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)";
             insert = connection.prepareStatement(sql);
         }
     }
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index 188e262a9..75f9fe36e 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -22,8 +22,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -42,7 +44,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.PipesReporter;
@@ -55,16 +59,20 @@ import org.apache.tika.pipes.pipesiterator.TotalCountResult;
 public class TestJDBCPipesReporter {
 
     @Test
-    public void testBasic() throws Exception {
+    public void testBasic(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
         int numThreads = 10;
         int numIterations = 200;
-        String connectionString = "jdbc:h2:mem:test_tika";
         JDBCPipesReporter reporter = new JDBCPipesReporter();
         reporter.setConnection(connectionString);
         reporter.initialize(new HashMap<>());
 
         Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
         reporter.close();
+
         Map<PipesResult.STATUS, Long> total = countReported(connectionString);
         assertEquals(expected.size(), total.size());
         long sum = 0;
@@ -77,13 +85,19 @@ public class TestJDBCPipesReporter {
     }
 
     @Test
-    public void testIncludes() throws Exception {
-        Path p = Paths.get(this.getClass().getResource("/tika-config-includes.xml").toURI());
-        AsyncConfig asyncConfig = AsyncConfig.load(p);
+    public void testIncludes(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-includes.xml",
+                connectionString, config);
+
+        AsyncConfig asyncConfig = AsyncConfig.load(config);
         PipesReporter reporter = asyncConfig.getPipesReporter();
         int numThreads = 10;
         int numIterations = 200;
-        String connectionString = "jdbc:h2:mem:test_tika";
 
         Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
         reporter.close();
@@ -103,13 +117,18 @@ public class TestJDBCPipesReporter {
     }
 
     @Test
-    public void testExcludes() throws Exception {
-        Path p = Paths.get(this.getClass().getResource("/tika-config-excludes.xml").toURI());
-        AsyncConfig asyncConfig = AsyncConfig.load(p);
+    public void testExcludes(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-excludes.xml",
+                connectionString, config);
+        AsyncConfig asyncConfig = AsyncConfig.load(config);
         PipesReporter reporter = asyncConfig.getPipesReporter();
         int numThreads = 10;
         int numIterations = 200;
-        String connectionString = "jdbc:h2:mem:test_tika";
 
         Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
         reporter.close();
@@ -235,4 +254,10 @@ public class TestJDBCPipesReporter {
             return written;
         }
     }
+
+    private void writeConfig(String srcConfig, String dbDir, Path config) throws IOException {
+        String xml = IOUtils.resourceToString(srcConfig, StandardCharsets.UTF_8);
+        xml = xml.replace("CONNECTION_STRING", dbDir);
+        Files.write(config, xml.getBytes(StandardCharsets.UTF_8));
+    }
 }
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
similarity index 96%
rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
index f35b27952..ab7682237 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
@@ -35,7 +35,7 @@
   </params>
   <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
     <params>
-      <connection>jdbc:h2:mem:test_tika</connection>
+      <connection>CONNECTION_STRING</connection>
       <excludes>
         <exclude>PARSE_SUCCESS</exclude>
         <exclude>PARSE_SUCCESS_WITH_EXCEPTION</exclude>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
similarity index 96%
rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
index fa7c74fcf..1c3c68663 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
@@ -35,7 +35,7 @@
   </params>
   <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
     <params>
-      <connection>jdbc:h2:mem:test_tika</connection>
+      <connection>CONNECTION_STRING</connection>
       <includes>
         <include>PARSE_SUCCESS</include>
         <include>PARSE_SUCCESS_WITH_EXCEPTION</include>