You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 21:36:39 UTC

[tika] 02/02: Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 4b114a31aee19ce921be4151bf75e4212596754b
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 16:36:27 2022 -0500

    Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter
---
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 19 +++++++--
 .../reporters/jdbc/TestJDBCPipesReporter.java      | 47 +++++++++++++++++-----
 .../{ => configs}/tika-config-excludes.xml         |  2 +-
 .../{ => configs}/tika-config-includes.xml         |  2 +-
 4 files changed, 53 insertions(+), 17 deletions(-)

diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index f29c2f986..835b50ec1 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -60,8 +60,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
     private static final long MAX_WAIT_MILLIS = 120000;
 
     private String connectionString;
-    private ArrayBlockingQueue<KeyStatusPair> queue =
-            new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE);
+    private final ArrayBlockingQueue<KeyStatusPair> queue =
+            new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
     CompletableFuture<Void> reportWorkerFuture;
 
     @Override
@@ -186,6 +186,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                     return;
                 }
                 if (p == KeyStatusPair.END_SEMAPHORE) {
+                    LOG.trace("received end semaphore");
                     try {
                         reportNow();
                     } catch (SQLException e) {
@@ -193,6 +194,15 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                     } catch (InterruptedException e) {
                         return;
                     }
+                    LOG.trace("about to close");
+                    try {
+                        insert.close();
+                        connection.close();
+                        LOG.trace("successfully closed resources");
+                    } catch (SQLException e) {
+                        LOG.warn("problem shutting down reporter", e);
+                    }
+
                     return;
                 }
                 cache.add(p);
@@ -220,6 +230,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                         insert.addBatch();
                     }
                     insert.executeBatch();
+                    LOG.debug("writing {} " + cache.size());
                     cache.clear();
                     return;
                 } catch (SQLException e) {
@@ -233,7 +244,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
             try (Statement st = connection.createStatement()) {
                 String sql = "drop table if exists " + TABLE_NAME;
                 st.execute(sql);
-                sql = "create table " + TABLE_NAME + " (emit_key varchar(512), status varchar(32))";
+                sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))";
                 st.execute(sql);
             }
         }
@@ -262,7 +273,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         }
 
         private void createPreparedStatement() throws SQLException {
-            String sql = "insert into " + TABLE_NAME + " (emit_key, status) values (?,?)";
+            String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)";
             insert = connection.prepareStatement(sql);
         }
     }
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index 188e262a9..75f9fe36e 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -22,8 +22,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
@@ -42,7 +44,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.PipesReporter;
@@ -55,16 +59,20 @@ import org.apache.tika.pipes.pipesiterator.TotalCountResult;
 public class TestJDBCPipesReporter {
 
     @Test
-    public void testBasic() throws Exception {
+    public void testBasic(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
         int numThreads = 10;
         int numIterations = 200;
-        String connectionString = "jdbc:h2:mem:test_tika";
         JDBCPipesReporter reporter = new JDBCPipesReporter();
         reporter.setConnection(connectionString);
         reporter.initialize(new HashMap<>());
 
         Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
         reporter.close();
+
         Map<PipesResult.STATUS, Long> total = countReported(connectionString);
         assertEquals(expected.size(), total.size());
         long sum = 0;
@@ -77,13 +85,19 @@ public class TestJDBCPipesReporter {
     }
 
     @Test
-    public void testIncludes() throws Exception {
-        Path p = Paths.get(this.getClass().getResource("/tika-config-includes.xml").toURI());
-        AsyncConfig asyncConfig = AsyncConfig.load(p);
+    public void testIncludes(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-includes.xml",
+                connectionString, config);
+
+        AsyncConfig asyncConfig = AsyncConfig.load(config);
         PipesReporter reporter = asyncConfig.getPipesReporter();
         int numThreads = 10;
         int numIterations = 200;
-        String connectionString = "jdbc:h2:mem:test_tika";
 
         Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
         reporter.close();
@@ -103,13 +117,18 @@ public class TestJDBCPipesReporter {
     }
 
     @Test
-    public void testExcludes() throws Exception {
-        Path p = Paths.get(this.getClass().getResource("/tika-config-excludes.xml").toURI());
-        AsyncConfig asyncConfig = AsyncConfig.load(p);
+    public void testExcludes(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-excludes.xml",
+                connectionString, config);
+        AsyncConfig asyncConfig = AsyncConfig.load(config);
         PipesReporter reporter = asyncConfig.getPipesReporter();
         int numThreads = 10;
         int numIterations = 200;
-        String connectionString = "jdbc:h2:mem:test_tika";
 
         Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
         reporter.close();
@@ -235,4 +254,10 @@ public class TestJDBCPipesReporter {
             return written;
         }
     }
+
+    private void writeConfig(String srcConfig, String dbDir, Path config) throws IOException {
+        String xml = IOUtils.resourceToString(srcConfig, StandardCharsets.UTF_8);
+        xml = xml.replace("CONNECTION_STRING", dbDir);
+        Files.write(config, xml.getBytes(StandardCharsets.UTF_8));
+    }
 }
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
similarity index 96%
rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
index f35b27952..ab7682237 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
@@ -35,7 +35,7 @@
   </params>
   <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
     <params>
-      <connection>jdbc:h2:mem:test_tika</connection>
+      <connection>CONNECTION_STRING</connection>
       <excludes>
         <exclude>PARSE_SUCCESS</exclude>
         <exclude>PARSE_SUCCESS_WITH_EXCEPTION</exclude>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
similarity index 96%
rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
index fa7c74fcf..1c3c68663 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
@@ -35,7 +35,7 @@
   </params>
   <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
     <params>
-      <connection>jdbc:h2:mem:test_tika</connection>
+      <connection>CONNECTION_STRING</connection>
       <includes>
         <include>PARSE_SUCCESS</include>
         <include>PARSE_SUCCESS_WITH_EXCEPTION</include>