You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/17 21:36:39 UTC
[tika] 02/02: Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 4b114a31aee19ce921be4151bf75e4212596754b
Author: tballison <ta...@apache.org>
AuthorDate: Thu Nov 17 16:36:27 2022 -0500
Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter
---
.../pipes/reporters/jdbc/JDBCPipesReporter.java | 19 +++++++--
.../reporters/jdbc/TestJDBCPipesReporter.java | 47 +++++++++++++++++-----
.../{ => configs}/tika-config-excludes.xml | 2 +-
.../{ => configs}/tika-config-includes.xml | 2 +-
4 files changed, 53 insertions(+), 17 deletions(-)
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index f29c2f986..835b50ec1 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -60,8 +60,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
private static final long MAX_WAIT_MILLIS = 120000;
private String connectionString;
- private ArrayBlockingQueue<KeyStatusPair> queue =
- new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE);
+ private final ArrayBlockingQueue<KeyStatusPair> queue =
+ new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
CompletableFuture<Void> reportWorkerFuture;
@Override
@@ -186,6 +186,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
return;
}
if (p == KeyStatusPair.END_SEMAPHORE) {
+ LOG.trace("received end semaphore");
try {
reportNow();
} catch (SQLException e) {
@@ -193,6 +194,15 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
} catch (InterruptedException e) {
return;
}
+ LOG.trace("about to close");
+ try {
+ insert.close();
+ connection.close();
+ LOG.trace("successfully closed resources");
+ } catch (SQLException e) {
+ LOG.warn("problem shutting down reporter", e);
+ }
+
return;
}
cache.add(p);
@@ -220,6 +230,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
insert.addBatch();
}
insert.executeBatch();
+ LOG.debug("writing {} " + cache.size());
cache.clear();
return;
} catch (SQLException e) {
@@ -233,7 +244,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
try (Statement st = connection.createStatement()) {
String sql = "drop table if exists " + TABLE_NAME;
st.execute(sql);
- sql = "create table " + TABLE_NAME + " (emit_key varchar(512), status varchar(32))";
+ sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))";
st.execute(sql);
}
}
@@ -262,7 +273,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
}
private void createPreparedStatement() throws SQLException {
- String sql = "insert into " + TABLE_NAME + " (emit_key, status) values (?,?)";
+ String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)";
insert = connection.prepareStatement(sql);
}
}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index 188e262a9..75f9fe36e 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -22,8 +22,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@@ -42,7 +44,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.PipesReporter;
@@ -55,16 +59,20 @@ import org.apache.tika.pipes.pipesiterator.TotalCountResult;
public class TestJDBCPipesReporter {
@Test
- public void testBasic() throws Exception {
+ public void testBasic(@TempDir Path tmpDir) throws Exception {
+ Files.createDirectories(tmpDir.resolve("db"));
+ Path dbDir = tmpDir.resolve("db/h2");
+ String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
int numThreads = 10;
int numIterations = 200;
- String connectionString = "jdbc:h2:mem:test_tika";
JDBCPipesReporter reporter = new JDBCPipesReporter();
reporter.setConnection(connectionString);
reporter.initialize(new HashMap<>());
Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
reporter.close();
+
Map<PipesResult.STATUS, Long> total = countReported(connectionString);
assertEquals(expected.size(), total.size());
long sum = 0;
@@ -77,13 +85,19 @@ public class TestJDBCPipesReporter {
}
@Test
- public void testIncludes() throws Exception {
- Path p = Paths.get(this.getClass().getResource("/tika-config-includes.xml").toURI());
- AsyncConfig asyncConfig = AsyncConfig.load(p);
+ public void testIncludes(@TempDir Path tmpDir) throws Exception {
+ Files.createDirectories(tmpDir.resolve("db"));
+ Path dbDir = tmpDir.resolve("db/h2");
+ Path config = tmpDir.resolve("tika-config.xml");
+ String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+ writeConfig("/configs/tika-config-includes.xml",
+ connectionString, config);
+
+ AsyncConfig asyncConfig = AsyncConfig.load(config);
PipesReporter reporter = asyncConfig.getPipesReporter();
int numThreads = 10;
int numIterations = 200;
- String connectionString = "jdbc:h2:mem:test_tika";
Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
reporter.close();
@@ -103,13 +117,18 @@ public class TestJDBCPipesReporter {
}
@Test
- public void testExcludes() throws Exception {
- Path p = Paths.get(this.getClass().getResource("/tika-config-excludes.xml").toURI());
- AsyncConfig asyncConfig = AsyncConfig.load(p);
+ public void testExcludes(@TempDir Path tmpDir) throws Exception {
+ Files.createDirectories(tmpDir.resolve("db"));
+ Path dbDir = tmpDir.resolve("db/h2");
+ Path config = tmpDir.resolve("tika-config.xml");
+ String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+ writeConfig("/configs/tika-config-excludes.xml",
+ connectionString, config);
+ AsyncConfig asyncConfig = AsyncConfig.load(config);
PipesReporter reporter = asyncConfig.getPipesReporter();
int numThreads = 10;
int numIterations = 200;
- String connectionString = "jdbc:h2:mem:test_tika";
Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations);
reporter.close();
@@ -235,4 +254,10 @@ public class TestJDBCPipesReporter {
return written;
}
}
+
+ private void writeConfig(String srcConfig, String dbDir, Path config) throws IOException {
+ String xml = IOUtils.resourceToString(srcConfig, StandardCharsets.UTF_8);
+ xml = xml.replace("CONNECTION_STRING", dbDir);
+ Files.write(config, xml.getBytes(StandardCharsets.UTF_8));
+ }
}
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
similarity index 96%
rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
index f35b27952..ab7682237 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml
@@ -35,7 +35,7 @@
</params>
<pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
<params>
- <connection>jdbc:h2:mem:test_tika</connection>
+ <connection>CONNECTION_STRING</connection>
<excludes>
<exclude>PARSE_SUCCESS</exclude>
<exclude>PARSE_SUCCESS_WITH_EXCEPTION</exclude>
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
similarity index 96%
rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
index fa7c74fcf..1c3c68663 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml
@@ -35,7 +35,7 @@
</params>
<pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
<params>
- <connection>jdbc:h2:mem:test_tika</connection>
+ <connection>CONNECTION_STRING</connection>
<includes>
<include>PARSE_SUCCESS</include>
<include>PARSE_SUCCESS_WITH_EXCEPTION</include>