You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/02/17 16:45:37 UTC

[tika] branch main updated: TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter (#971)

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e61a5809 TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter (#971)
5e61a5809 is described below

commit 5e61a58096e9898a7b862bf623e68ba77206d341
Author: Tim Allison <ta...@apache.org>
AuthorDate: Fri Feb 17 11:45:32 2023 -0500

    TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter (#971)
---
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       | 157 ++++++++++++++-------
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    |  92 +++++++++---
 2 files changed, 174 insertions(+), 75 deletions(-)

diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index 968cbb4ad..e51351430 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -28,11 +28,13 @@ import java.sql.Types;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -50,6 +52,7 @@ import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.utils.StringUtils;
 
@@ -75,11 +78,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         FIRST_ONLY, CONCATENATE
         //anything else?
     }
+
     //some file formats do not have time zones...
     //try both
-    private static final String[] TIKA_DATE_PATTERNS = new String[] {
-            "yyyy-MM-dd'T'HH:mm:ss'Z'","yyyy-MM-dd'T'HH:mm:ss"
-    };
+    private static final String[] TIKA_DATE_PATTERNS =
+            new String[]{"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"};
     //the "write" lock is used for creating the table
     private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
     //this keeps track of which table + connection string have been created
@@ -88,9 +91,14 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     //different jdbc emitters.
     private static Set<String> TABLES_CREATED = new HashSet<>();
     private String connectionString;
+
+    private Optional<String> postConnectionString = Optional.empty();
     private String insert;
     private String createTable;
     private String alterTable;
+
+    private int maxRetries = 0;
+
     private Map<String, String> keys;
     private Connection connection;
     private PreparedStatement insertStatement;
@@ -112,6 +120,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
             dateFormats[i++] = new SimpleDateFormat(p, Locale.US);
         }
     }
+
     /**
      * This is called immediately after the table is created.
      * The purpose of this is to allow for adding a complex primary key or
@@ -134,8 +143,25 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     }
 
     @Field
-    public void setConnection(String connectionString) {
-        this.connectionString = connectionString;
+    public void setConnection(String connection) {
+        this.connectionString = connection;
+    }
+
+    public void setMaxRetries(int maxRetries) {
+        this.maxRetries = maxRetries;
+    }
+
+    /**
+     * This sql will be called immediately after the connection is made. This was
+     * initially added for setting pragmas on sqlite3, but may be used for other
+     * connection configuration in other dbs. Note: This is called before the table is
+     * created if it needs to be created.
+     *
+     * @param postConnection
+     */
+    @Field
+    public void setPostConnection(String postConnection) {
+        this.postConnectionString = Optional.of(postConnection);
     }
 
     /**
@@ -143,7 +169,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
      * a multivalued field in a metadata object, do you want the first value only
      * or should we concatenate these with the
      * {@link JDBCEmitter#setMultivaluedFieldDelimiter(String)}.
-     *
+     * <p>
      * The default values as of 2.6.1 are {@link MultivaluedFieldStrategy#CONCATENATE}
      * and the default delimiter is &quot;, &quot;
      *
@@ -169,7 +195,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
 
     /**
      * See {@link JDBCEmitter#setMultivaluedFieldDelimiter(String)}
-      * @param delimiter
+     *
+     * @param delimiter
      */
     @Field
     public void setMultivaluedFieldDelimiter(String delimiter) {
@@ -209,7 +236,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
      * This executes the emit with each call.  For more efficient
      * batch execution use {@link #emit(List)}.
      *
-     * @param emitKey emit key
+     * @param emitKey      emit key
      * @param metadataList list of metadata per file
      * @throws IOException
      * @throws TikaEmitterException
@@ -220,52 +247,43 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         if (metadataList == null || metadataList.size() < 1) {
             return;
         }
-        try {
-            if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
-                insertFirstOnly(emitKey, metadataList);
-                insertStatement.execute();
-            } else {
-                insertAll(emitKey, metadataList);
-                insertStatement.executeBatch();
-            }
-        } catch (SQLException e) {
-            try {
-                LOGGER.warn("problem during emit; going to try to reconnect", e);
-                //something went wrong
-                //try to reconnect
-                reconnect();
-            } catch (SQLException ex) {
-                throw new TikaEmitterException("Couldn't reconnect!", ex);
-            }
-            throw new TikaEmitterException("couldn't emit", e);
-        }
+        List<EmitData> emitDataList = new ArrayList<>();
+        emitDataList.add(new EmitData(new EmitKey("", emitKey), metadataList));
+        emit(emitDataList);
     }
 
     @Override
     public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
-        try {
-            if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
-                for (EmitData d : emitData) {
-                    insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList());
-                    insertStatement.addBatch();
-                }
-            } else {
-                for (EmitData d : emitData) {
-                    insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList());
+        int tries = 0;
+        Exception ex = null;
+        while (tries++ <= maxRetries) {
+            try {
+                emitNow(emitData);
+                return;
+            } catch (SQLException e) {
+                try {
+                    reconnect();
+                } catch (SQLException exc) {
+                    throw new TikaEmitterException("couldn't reconnect!", exc);
                 }
+                ex = e;
             }
-            insertStatement.executeBatch();
-        } catch (SQLException e) {
-            try {
-                LOGGER.warn("problem during emit; going to try to reconnect", e);
-                //something went wrong
-                //try to reconnect
-                reconnect();
-            } catch (SQLException ex) {
-                throw new TikaEmitterException("Couldn't reconnect!", ex);
+        }
+        throw new TikaEmitterException("Couldn't emit " + emitData.size() + " records.", ex);
+    }
+
+    private void emitNow(List<? extends EmitData> emitData) throws SQLException {
+        if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+            for (EmitData d : emitData) {
+                insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList());
+                insertStatement.addBatch();
+            }
+        } else {
+            for (EmitData d : emitData) {
+                insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList());
             }
-            throw new TikaEmitterException("couldn't emit", e);
         }
+        insertStatement.executeBatch();
     }
 
     private void insertAll(String emitKey, List<Metadata> metadataList) throws SQLException {
@@ -299,7 +317,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         SQLException ex = null;
         for (int i = 0; i < 3; i++) {
             try {
-                connection = DriverManager.getConnection(connectionString);
+                tryClose();
+                createConnection();
                 insertStatement = connection.prepareStatement(insert);
                 return;
             } catch (SQLException e) {
@@ -310,6 +329,33 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         throw ex;
     }
 
+    private void tryClose() {
+        if (insertStatement != null) {
+            try {
+                insertStatement.close();
+            } catch (SQLException e) {
+                LOGGER.warn("exception closing insert", e);
+            }
+        }
+
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                LOGGER.warn("exception closing connection", e);
+            }
+        }
+    }
+
+    private void createConnection() throws SQLException {
+        connection = DriverManager.getConnection(connectionString);
+        if (postConnectionString.isPresent()) {
+            try (Statement st = connection.createStatement()) {
+                st.execute(postConnectionString.get());
+            }
+        }
+    }
+
     private void updateValue(PreparedStatement insertStatement, int i, String key, String type,
                              int metadataListIndex, List<Metadata> metadataList)
             throws SQLException {
@@ -352,7 +398,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     }
 
     private String getVal(Metadata metadata, String key, String type) {
-        if (! type.equals("string") && ! type.startsWith("varchar")) {
+        if (!type.equals("string") && !type.startsWith("varchar")) {
             return metadata.get(key);
         }
         if (multivaluedFieldStrategy == MultivaluedFieldStrategy.FIRST_ONLY) {
@@ -380,7 +426,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         return sb.toString();
     }
 
-    private void updateDouble(PreparedStatement insertStatement, int i, String val) throws SQLException {
+    private void updateDouble(PreparedStatement insertStatement, int i, String val)
+            throws SQLException {
         if (StringUtils.isBlank(val)) {
             insertStatement.setNull(i, Types.DOUBLE);
             return;
@@ -390,8 +437,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     }
 
     private void updateVarchar(String key, String type, PreparedStatement insertStatement, int i,
-                               String val)
-            throws SQLException {
+                               String val) throws SQLException {
         if (StringUtils.isBlank(val)) {
             updateString(insertStatement, i, val);
             return;
@@ -480,7 +526,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     public void initialize(Map<String, Param> params) throws TikaConfigException {
 
         try {
-            connection = DriverManager.getConnection(connectionString);
+            createConnection();
         } catch (SQLException e) {
             throw new TikaConfigException("couldn't open connection: " + connectionString, e);
         }
@@ -509,8 +555,6 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         } catch (SQLException e) {
             throw new TikaConfigException("can't create insert statement", e);
         }
-
-
     }
 
     @Override
@@ -524,6 +568,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
      */
     @Override
     public void close() throws IOException {
+        try {
+            insertStatement.close();
+        } catch (SQLException e) {
+            LOGGER.warn("problem closing insert", e);
+        }
         try {
             connection.close();
         } catch (SQLException e) {
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index 835b50ec1..febb9cc26 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -25,6 +25,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -60,6 +61,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
     private static final long MAX_WAIT_MILLIS = 120000;
 
     private String connectionString;
+
+    private Optional<String> postConnectionString = Optional.empty();
     private final ArrayBlockingQueue<KeyStatusPair> queue =
             new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
     CompletableFuture<Void> reportWorkerFuture;
@@ -70,7 +73,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         if (StringUtils.isBlank(connectionString)) {
             throw new TikaConfigException("Must specify a connectionString");
         }
-        ReportWorker reportWorker = new ReportWorker(connectionString, queue);
+        ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString, queue);
         reportWorker.init();
         reportWorkerFuture = CompletableFuture.runAsync(reportWorker);
     }
@@ -87,6 +90,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
         this.connectionString = connection;
     }
 
+    /**
+     * This sql will be called immediately after the connection is made. This was
+     * initially added for setting pragmas on sqlite3, but may be used for other
+     * connection configuration in other dbs. Note: This is called before the table is
+     * created if it needs to be created.
+     *
+     * @param postConnection
+     */
+    @Field
+    public void setPostConnection(String postConnection) {
+        this.postConnectionString = Optional.of(postConnection);
+    }
 
     @Override
     public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
@@ -155,13 +170,17 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
 
         private static final int MAX_TRIES = 3;
         private final String connectionString;
+        private final Optional<String> postConnectionString;
         private final ArrayBlockingQueue<KeyStatusPair> queue;
         List<KeyStatusPair> cache = new ArrayList<>();
         private Connection connection;
         private PreparedStatement insert;
 
-        public ReportWorker(String connectionString, ArrayBlockingQueue<KeyStatusPair> queue) {
+        public ReportWorker(String connectionString,
+                            Optional<String> postConnectionString,
+                            ArrayBlockingQueue<KeyStatusPair> queue) {
             this.connectionString = connectionString;
+            this.postConnectionString = postConnectionString;
             this.queue = queue;
         }
 
@@ -186,23 +205,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
                     return;
                 }
                 if (p == KeyStatusPair.END_SEMAPHORE) {
-                    LOG.trace("received end semaphore");
-                    try {
-                        reportNow();
-                    } catch (SQLException e) {
-                        throw new RuntimeException(e);
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                    LOG.trace("about to close");
-                    try {
-                        insert.close();
-                        connection.close();
-                        LOG.trace("successfully closed resources");
-                    } catch (SQLException e) {
-                        LOG.warn("problem shutting down reporter", e);
-                    }
-
+                    shutdownNow();
                     return;
                 }
                 cache.add(p);
@@ -219,6 +222,29 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
 
         }
 
+        private void shutdownNow() {
+            LOG.trace("received end semaphore");
+            try {
+                reportNow();
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                return;
+            }
+            LOG.trace("about to close");
+            try {
+                insert.close();
+            } catch (SQLException e) {
+                LOG.warn("problem shutting down insert statement in reporter", e);
+            }
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.warn("problem shutting down connection in reporter", e);
+            }
+            LOG.trace("successfully closed resources");
+        }
+
         private void reportNow() throws SQLException, InterruptedException {
             int attempt = 0;
             while (++attempt < MAX_TRIES) {
@@ -254,22 +280,46 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
             SQLException ex = null;
             while (++attempts < 3) {
                 try {
+                    tryClose();
                     createConnection();
                     createPreparedStatement();
+                    LOG.debug("success reconnecting after {} attempts", attempts);
                     return;
                 } catch (SQLException e) {
                     LOG.warn("problem reconnecting", e);
-                    //if there's a failure, wait 10 seconds
+                    //if there's a failure, wait 30 seconds
                     //and hope the db is back up.
-                    Thread.sleep(10000);
+                    Thread.sleep(30000);
                     ex = e;
                 }
             }
             throw ex;
         }
 
+        private void tryClose() {
+            if (insert != null) {
+                try {
+                    insert.close();
+                } catch (SQLException e) {
+                    LOG.warn("exception closing insert statement", insert);
+                }
+            }
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.warn("exception closing connection", e);
+                }
+            }
+        }
+
         private void createConnection() throws SQLException {
             connection = DriverManager.getConnection(connectionString);
+            if (postConnectionString.isPresent()) {
+                try (Statement st = connection.createStatement()) {
+                    st.execute(postConnectionString.get());
+                }
+            }
         }
 
         private void createPreparedStatement() throws SQLException {