You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2024/03/20 18:31:44 UTC

(tika) branch TIKA-4213 created (now 51477fff2)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a change to branch TIKA-4213
in repository https://gitbox.apache.org/repos/asf/tika.git


      at 51477fff2 TIKA-4213

This branch includes the following new commits:

     new 51477fff2 TIKA-4213

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(tika) 01/01: TIKA-4213

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4213
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 51477fff2e8979d087a11de7d23a21ead7bbd1de
Author: tallison <ta...@apache.org>
AuthorDate: Wed Mar 20 14:31:34 2024 -0400

    TIKA-4213
---
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 52 ++++++++++++----------
 1 file changed, 29 insertions(+), 23 deletions(-)

diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index 0082eb9de..0d6165140 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -22,6 +22,8 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +70,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
     private String connectionString;
 
     private Optional<String> postConnectionString = Optional.empty();
-    private final ArrayBlockingQueue<KeyStatusPair> queue =
+    private final ArrayBlockingQueue<IdStatusPair> queue =
             new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
     CompletableFuture<Void> reportWorkerFuture;
 
@@ -146,7 +148,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
             return;
         }
         try {
-            queue.offer(new KeyStatusPair(t.getEmitKey().getEmitKey(), result.getStatus()),
+            queue.offer(new IdStatusPair(t.getId(), result.getStatus()),
                     MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             //swallow
@@ -167,7 +169,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
     @Override
     public void close() throws IOException {
         try {
-            queue.offer(KeyStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
+            queue.offer(IdStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             return;
         }
@@ -186,20 +188,20 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         }
     }
 
-    private static class KeyStatusPair {
+    private static class IdStatusPair {
 
-        static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null);
-        private final String emitKey;
+        static IdStatusPair END_SEMAPHORE = new IdStatusPair(null, null);
+        private final String id;
         private final PipesResult.STATUS status;
 
-        public KeyStatusPair(String emitKey, PipesResult.STATUS status) {
-            this.emitKey = emitKey;
+        public IdStatusPair(String id, PipesResult.STATUS status) {
+            this.id = id;
             this.status = status;
         }
 
         @Override
         public String toString() {
-            return "KeyStatusPair{" + "emitKey='" + emitKey + '\'' + ", status=" + status + '}';
+            return "KeyStatusPair{" + "id='" + id + '\'' + ", status=" + status + '}';
         }
     }
 
@@ -208,18 +210,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         private static final int MAX_TRIES = 3;
         private final String connectionString;
         private final Optional<String> postConnectionString;
-        private final ArrayBlockingQueue<KeyStatusPair> queue;
+        private final ArrayBlockingQueue<IdStatusPair> queue;
         private final int cacheSize;
         private final long reportWithinMs;
 
-        List<KeyStatusPair> cache = new ArrayList<>();
+        List<IdStatusPair> cache = new ArrayList<>();
         private Connection connection;
         private PreparedStatement insert;
 
 
         public ReportWorker(String connectionString,
                             Optional<String> postConnectionString,
-                            ArrayBlockingQueue<KeyStatusPair> queue, int cacheSize,
+                            ArrayBlockingQueue<IdStatusPair> queue, int cacheSize,
                             long reportWithinMs) {
             this.connectionString = connectionString;
             this.postConnectionString = postConnectionString;
@@ -242,18 +244,19 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         public void run() {
             long lastReported = System.currentTimeMillis();
             while (true) {
-                //blocking
-                KeyStatusPair p = null;
+                IdStatusPair p = null;
                 try {
-                    p = queue.take();
+                    p = queue.poll(reportWithinMs, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     return;
                 }
-                if (p == KeyStatusPair.END_SEMAPHORE) {
-                    shutdownNow();
-                    return;
+                if (p != null) {
+                    if (p == IdStatusPair.END_SEMAPHORE) {
+                        shutdownNow();
+                        return;
+                    }
+                    cache.add(p);
                 }
-                cache.add(p);
                 long elapsed = System.currentTimeMillis() - lastReported;
 
                 if (cache.size() >= cacheSize || elapsed > reportWithinMs) {
@@ -296,10 +299,11 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
             int attempt = 0;
             while (++attempt < MAX_TRIES) {
                 try {
-                    for (KeyStatusPair p : cache) {
+                    for (IdStatusPair p : cache) {
                         insert.clearParameters();
-                        insert.setString(1, p.emitKey);
+                        insert.setString(1, p.id);
                         insert.setString(2, p.status.name());
+                        insert.setTimestamp(3, Timestamp.from(Instant.now()));
                         insert.addBatch();
                     }
                     insert.executeBatch();
@@ -317,7 +321,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
             try (Statement st = connection.createStatement()) {
                 String sql = "drop table if exists " + TABLE_NAME;
                 st.execute(sql);
-                sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))";
+                sql = "create table " + TABLE_NAME + " (id varchar(1024), status varchar(32), " +
+                        "timestamp timestamptz)";
                 st.execute(sql);
             }
         }
@@ -370,7 +375,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         }
 
         private void createPreparedStatement() throws SQLException {
-            String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)";
+            //do we want to do an upsert?
+            String sql = "insert into " + TABLE_NAME + " (id, status, timestamp) values (?,?,?)";
             insert = connection.prepareStatement(sql);
         }
     }