You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2020/04/08 15:34:49 UTC

[tika] 12/14: TIKA-3085 -- switch to batch inserts in tika-eval

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch branch_1x
in repository https://gitbox.apache.org/repos/asf/tika.git

commit a3fef30eb403c6767eaf3a4016a11f7c75c2f869
Author: tallison <ta...@apache.org>
AuthorDate: Wed Apr 8 10:46:59 2020 -0400

    TIKA-3085 -- switch to batch inserts in tika-eval
---
 .../java/org/apache/tika/eval/db/JDBCUtil.java     | 30 +++++++++++++++++++
 .../java/org/apache/tika/eval/io/DBWriter.java     | 35 +++++++++++++++++-----
 2 files changed, 58 insertions(+), 7 deletions(-)

diff --git a/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java b/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
index 794c55b..5c3e427 100644
--- a/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
+++ b/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
@@ -140,6 +140,10 @@ public class JDBCUtil {
         return tables;
     }
 
+    @Deprecated
+    /**
+     * @deprecated use {@link #batchInsert(PreparedStatement, TableInfo, Map)}
+     */
     public static int insert(PreparedStatement insertStatement,
                              TableInfo table,
                              Map<Cols, String> data) throws SQLException {
@@ -165,6 +169,28 @@ public class JDBCUtil {
         }
     }
 
+    public static void batchInsert(PreparedStatement insertStatement,
+                             TableInfo table,
+                             Map<Cols, String> data) throws SQLException {
+
+        try {
+            int i = 1;
+            for (ColInfo colInfo : table.getColInfos()) {
+                updateInsertStatement(i, insertStatement, colInfo, data.get(colInfo.getName()));
+                i++;
+            }
+            for (Cols c : data.keySet()) {
+                if (!table.containsColumn(c)) {
+                    throw new IllegalArgumentException("Can't add data to " + c +
+                            " because it doesn't exist in the table: " + table.getName());
+                }
+            }
+            insertStatement.addBatch();
+        } catch (SQLException e) {
+            LOG.warn("couldn't insert data for this row: {}", e.getMessage());
+        }
+    }
+
     public static void updateInsertStatement(int dbColOffset, PreparedStatement st,
                                              ColInfo colInfo, String value) throws SQLException {
         if (value == null) {
@@ -178,9 +204,13 @@ public class JDBCUtil {
                         value = value.substring(0, colInfo.getPrecision());
                         LOG.warn("truncated varchar value in {} : {}", colInfo.getName(), value);
                     }
+                    //postgres doesn't allow \0000
+                    value = value.replaceAll("\u0000", " ");
                     st.setString(dbColOffset, value);
                     break;
                 case Types.CHAR:
+                    //postgres doesn't allow \0000
+                    value = value.replaceAll("\u0000", " ");
                     st.setString(dbColOffset, value);
                     break;
                 case Types.DOUBLE:
diff --git a/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java b/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
index 8aea3cd..909727a 100644
--- a/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
+++ b/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
@@ -50,8 +50,8 @@ public class DBWriter implements IDBWriter {
     private static final Logger LOG = LoggerFactory.getLogger(DBWriter.class);
 
     private static final AtomicInteger WRITER_ID = new AtomicInteger();
-    private final AtomicLong insertedRows = new AtomicLong();
-    private final Long commitEveryX = 1000L;
+    private final Long commitEveryXRows = 10000L;
+    //private final Long commitEveryXMS = 60000L;
 
     private final Connection conn;
     private final JDBCUtil dbUtil;
@@ -60,7 +60,7 @@ public class DBWriter implements IDBWriter {
 
     //<tableName, preparedStatement>
     private final Map<String, PreparedStatement> inserts = new HashMap<>();
-
+    private final Map<String, LastInsert> lastInsertMap = new HashMap<>();
     public DBWriter(Connection connection, List<TableInfo> tableInfos, JDBCUtil dbUtil, MimeBuffer mimeBuffer)
             throws IOException, SQLException {
 
@@ -71,6 +71,7 @@ public class DBWriter implements IDBWriter {
             try {
                 PreparedStatement st = createPreparedInsert(tableInfo);
                 inserts.put(tableInfo.getName(), st);
+                lastInsertMap.put(tableInfo.getName(), new LastInsert());
             } catch (SQLException e) {
                 throw new RuntimeException(e);
             }
@@ -115,11 +116,19 @@ public class DBWriter implements IDBWriter {
                 throw new RuntimeException("Failed to create prepared statement for: "+
                         table.getName());
             }
-            dbUtil.insert(p, table, data);
-            long rows = insertedRows.incrementAndGet();
-            if (rows % commitEveryX == 0) {
-                LOG.debug("writer ({}) is committing after {} rows", myId, rows);
+            dbUtil.batchInsert(p, table, data);
+            LastInsert lastInsert = lastInsertMap.get(table.getName());
+            lastInsert.rowCount++;
+            long elapsed = System.currentTimeMillis()-lastInsert.lastInsert;
+            if (
+                    //elapsed > commitEveryXMS ||
+                lastInsert.rowCount % commitEveryXRows == 0) {
+                LOG.info("writer ({}) on table ({}) is committing after {} rows and {} ms", myId,
+                        table.getName(),
+                        lastInsert.rowCount, elapsed);
+                p.executeBatch();
                 conn.commit();
+                lastInsert.lastInsert = System.currentTimeMillis();
             }
         } catch (SQLException e) {
             throw new IOException(e);
@@ -127,6 +136,13 @@ public class DBWriter implements IDBWriter {
     }
 
     public void close() throws IOException {
+        for (PreparedStatement p : inserts.values()) {
+            try {
+                p.executeBatch();
+            } catch (SQLException e) {
+                throw new IOExceptionWithCause(e);
+            }
+        }
         try {
             conn.commit();
         } catch (SQLException e){
@@ -139,4 +155,9 @@ public class DBWriter implements IDBWriter {
         }
 
     }
+
+    private class LastInsert {
+        private long lastInsert = System.currentTimeMillis();
+        private long rowCount = 0;
+    }
 }