You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2024/03/21 12:42:30 UTC
(tika) branch main updated: TIKA-4213 -- improve jdbc pipes reporter (#1669)
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new e63730e12 TIKA-4213 -- improve jdbc pipes reporter (#1669)
e63730e12 is described below
commit e63730e126e74b4ac36e5f2b8c6790963eb41c14
Author: Tim Allison <ta...@apache.org>
AuthorDate: Thu Mar 21 08:42:25 2024 -0400
TIKA-4213 -- improve jdbc pipes reporter (#1669)
* TIKA-4213 -- improve jdbc pipes reporter
---
.../pipes/reporters/jdbc/JDBCPipesReporter.java | 52 ++++++++++++----------
1 file changed, 29 insertions(+), 23 deletions(-)
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index 0082eb9de..ee52bf80f 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -22,6 +22,8 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -68,7 +70,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
private String connectionString;
private Optional<String> postConnectionString = Optional.empty();
- private final ArrayBlockingQueue<KeyStatusPair> queue =
+ private final ArrayBlockingQueue<IdStatusPair> queue =
new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
CompletableFuture<Void> reportWorkerFuture;
@@ -146,7 +148,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
return;
}
try {
- queue.offer(new KeyStatusPair(t.getEmitKey().getEmitKey(), result.getStatus()),
+ queue.offer(new IdStatusPair(t.getId(), result.getStatus()),
MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//swallow
@@ -167,7 +169,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
@Override
public void close() throws IOException {
try {
- queue.offer(KeyStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
+ queue.offer(IdStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
@@ -186,20 +188,20 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
}
}
- private static class KeyStatusPair {
+ private static class IdStatusPair {
- static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null);
- private final String emitKey;
+ static IdStatusPair END_SEMAPHORE = new IdStatusPair(null, null);
+ private final String id;
private final PipesResult.STATUS status;
- public KeyStatusPair(String emitKey, PipesResult.STATUS status) {
- this.emitKey = emitKey;
+ public IdStatusPair(String id, PipesResult.STATUS status) {
+ this.id = id;
this.status = status;
}
@Override
public String toString() {
- return "KeyStatusPair{" + "emitKey='" + emitKey + '\'' + ", status=" + status + '}';
+ return "KeyStatusPair{" + "id='" + id + '\'' + ", status=" + status + '}';
}
}
@@ -208,18 +210,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
private static final int MAX_TRIES = 3;
private final String connectionString;
private final Optional<String> postConnectionString;
- private final ArrayBlockingQueue<KeyStatusPair> queue;
+ private final ArrayBlockingQueue<IdStatusPair> queue;
private final int cacheSize;
private final long reportWithinMs;
- List<KeyStatusPair> cache = new ArrayList<>();
+ List<IdStatusPair> cache = new ArrayList<>();
private Connection connection;
private PreparedStatement insert;
public ReportWorker(String connectionString,
Optional<String> postConnectionString,
- ArrayBlockingQueue<KeyStatusPair> queue, int cacheSize,
+ ArrayBlockingQueue<IdStatusPair> queue, int cacheSize,
long reportWithinMs) {
this.connectionString = connectionString;
this.postConnectionString = postConnectionString;
@@ -242,18 +244,19 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
public void run() {
long lastReported = System.currentTimeMillis();
while (true) {
- //blocking
- KeyStatusPair p = null;
+ IdStatusPair p = null;
try {
- p = queue.take();
+ p = queue.poll(reportWithinMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return;
}
- if (p == KeyStatusPair.END_SEMAPHORE) {
- shutdownNow();
- return;
+ if (p != null) {
+ if (p == IdStatusPair.END_SEMAPHORE) {
+ shutdownNow();
+ return;
+ }
+ cache.add(p);
}
- cache.add(p);
long elapsed = System.currentTimeMillis() - lastReported;
if (cache.size() >= cacheSize || elapsed > reportWithinMs) {
@@ -296,10 +299,11 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
int attempt = 0;
while (++attempt < MAX_TRIES) {
try {
- for (KeyStatusPair p : cache) {
+ for (IdStatusPair p : cache) {
insert.clearParameters();
- insert.setString(1, p.emitKey);
+ insert.setString(1, p.id);
insert.setString(2, p.status.name());
+ insert.setTimestamp(3, Timestamp.from(Instant.now()));
insert.addBatch();
}
insert.executeBatch();
@@ -317,7 +321,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
try (Statement st = connection.createStatement()) {
String sql = "drop table if exists " + TABLE_NAME;
st.execute(sql);
- sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))";
+ sql = "create table " + TABLE_NAME + " (id varchar(1024), status varchar(32), " +
+ "timestamp timestamp with time zone)";
st.execute(sql);
}
}
@@ -370,7 +375,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
}
private void createPreparedStatement() throws SQLException {
- String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)";
+ //do we want to do an upsert?
+ String sql = "insert into " + TABLE_NAME + " (id, status, timestamp) values (?,?,?)";
insert = connection.prepareStatement(sql);
}
}