You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/09/06 12:09:53 UTC

[tika] branch main updated: TIKA-3846 -- improve jdbc emitter to do batch updates when emit is called with a list of emit data

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new deeec7772 TIKA-3846 -- improve jdbc emitter to do batch updates when emit is called with a list of emit data
deeec7772 is described below

commit deeec77722f28b98560a26e062cb28fa1a9f65ea
Author: tballison <ta...@apache.org>
AuthorDate: Tue Sep 6 08:09:43 2022 -0400

    TIKA-3846 -- improve jdbc emitter to do batch updates when emit is called with a list of emit data
---
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       | 100 ++++++++++++---------
 1 file changed, 59 insertions(+), 41 deletions(-)

diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index 274ae5d6c..14965f345 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -41,6 +41,7 @@ import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
 import org.apache.tika.utils.StringUtils;
 
@@ -52,15 +53,9 @@ import org.apache.tika.utils.StringUtils;
  */
 public class JDBCEmitter extends AbstractEmitter implements Initializable, Closeable {
 
-    public enum AttachmentStrategy {
-        FIRST_ONLY, ALL
-        //anything else?
-    }
-
     private static final Logger LOGGER = LoggerFactory.getLogger(JDBCEmitter.class);
     //the "write" lock is used for creating the table
     private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
-
     //this keeps track of which table + connection string have been created
     //so that only one table is created per table + connection string.
     //This is necessary for testing and if someone specifies multiple
@@ -73,10 +68,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
     private Map<String, String> keys;
     private Connection connection;
     private PreparedStatement insertStatement;
-
     private AttachmentStrategy attachmentStrategy = AttachmentStrategy.FIRST_ONLY;
 
-
     /**
      * This is called immediately after the table is created.
      * The purpose of this is to allow for adding a complex primary key or
@@ -132,32 +125,29 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         }
     }
 
+    /**
+     * This executes the emit with each call.  For more efficient
+     * batch execution use {@link #emit(List)}.
+     *
+     * @param emitKey emit key
+     * @param metadataList list of metadata per file
+     * @throws IOException
+     * @throws TikaEmitterException
+     */
     @Override
     public void emit(String emitKey, List<Metadata> metadataList)
             throws IOException, TikaEmitterException {
         if (metadataList == null || metadataList.size() < 1) {
             return;
         }
-        if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
-            emitFirstOnly(emitKey, metadataList);
-        } else {
-            emitAll(emitKey, metadataList);
-        }
-    }
-
-    private void emitAll(String emitKey, List<Metadata> metadataList) throws TikaEmitterException {
         try {
-            for (int i = 0; i < metadataList.size(); i++) {
-                insertStatement.clearParameters();
-                int col = 0;
-                insertStatement.setString(++col, emitKey);
-                insertStatement.setInt(++col, i);
-                for (Map.Entry<String, String> e : keys.entrySet()) {
-                    updateValue(insertStatement, ++col, e.getKey(), e.getValue(), i, metadataList);
-                }
-                insertStatement.addBatch();
+            if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+                insertFirstOnly(emitKey, metadataList);
+                insertStatement.execute();
+            } else {
+                insertAll(emitKey, metadataList);
+                insertStatement.executeBatch();
             }
-            insertStatement.executeBatch();
         } catch (SQLException e) {
             try {
                 LOGGER.warn("problem during emit; going to try to reconnect", e);
@@ -169,20 +159,22 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
             }
             throw new TikaEmitterException("couldn't emit", e);
         }
-
     }
 
-    private void emitFirstOnly(String emitKey, List<Metadata> metadataList)
-            throws TikaEmitterException {
-
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
         try {
-            insertStatement.clearParameters();
-            int i = 0;
-            insertStatement.setString(++i, emitKey);
-            for (Map.Entry<String, String> e : keys.entrySet()) {
-                updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
+            if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+                for (EmitData d : emitData) {
+                    insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList());
+                    insertStatement.addBatch();
+                }
+            } else {
+                for (EmitData d : emitData) {
+                    insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList());
+                }
             }
-            insertStatement.execute();
+            insertStatement.executeBatch();
         } catch (SQLException e) {
             try {
                 LOGGER.warn("problem during emit; going to try to reconnect", e);
@@ -194,7 +186,28 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
             }
             throw new TikaEmitterException("couldn't emit", e);
         }
+    }
+
+    private void insertAll(String emitKey, List<Metadata> metadataList) throws SQLException {
+        for (int i = 0; i < metadataList.size(); i++) {
+            insertStatement.clearParameters();
+            int col = 0;
+            insertStatement.setString(++col, emitKey);
+            insertStatement.setInt(++col, i);
+            for (Map.Entry<String, String> e : keys.entrySet()) {
+                updateValue(insertStatement, ++col, e.getKey(), e.getValue(), i, metadataList);
+            }
+            insertStatement.addBatch();
+        }
+    }
 
+    private void insertFirstOnly(String emitKey, List<Metadata> metadataList) throws SQLException {
+        insertStatement.clearParameters();
+        int i = 0;
+        insertStatement.setString(++i, emitKey);
+        for (Map.Entry<String, String> e : keys.entrySet()) {
+            updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
+        }
     }
 
     private void reconnect() throws SQLException {
@@ -330,11 +343,6 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
         //require
     }
 
-    /*
-        TODO: This is currently not ever called.  We need rework the PipesParser
-        to ensure that emitters are closed cleanly.
-     */
-
     /**
      * @throws IOException
      */
@@ -346,4 +354,14 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
             throw new IOException(e);
         }
     }
+
+    /*
+        TODO: This is currently not ever called.  We need rework the PipesParser
+        to ensure that emitters are closed cleanly.
+     */
+
+    public enum AttachmentStrategy {
+        FIRST_ONLY, ALL
+        //anything else?
+    }
 }