You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2020/04/08 15:02:20 UTC
[tika] 01/02: TIKA-3085 -- switch to batch inserts in tika-eval
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tika.git
commit 43f06c904fcdb01244ba4c90fdbbfb881e92e0f6
Author: tallison <ta...@apache.org>
AuthorDate: Wed Apr 8 10:46:59 2020 -0400
TIKA-3085 -- switch to batch inserts in tika-eval
---
.../java/org/apache/tika/eval/db/JDBCUtil.java | 30 +++++++++++++++++++
.../java/org/apache/tika/eval/io/DBWriter.java | 35 +++++++++++++++++-----
2 files changed, 58 insertions(+), 7 deletions(-)
diff --git a/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java b/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
index 794c55b..5c3e427 100644
--- a/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
+++ b/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java
@@ -140,6 +140,10 @@ public class JDBCUtil {
return tables;
}
+ @Deprecated
+ /**
+ * @deprecated use {@link #batchInsert(PreparedStatement, TableInfo, Map)}
+ */
public static int insert(PreparedStatement insertStatement,
TableInfo table,
Map<Cols, String> data) throws SQLException {
@@ -165,6 +169,28 @@ public class JDBCUtil {
}
}
+ public static void batchInsert(PreparedStatement insertStatement,
+ TableInfo table,
+ Map<Cols, String> data) throws SQLException {
+
+ try {
+ int i = 1;
+ for (ColInfo colInfo : table.getColInfos()) {
+ updateInsertStatement(i, insertStatement, colInfo, data.get(colInfo.getName()));
+ i++;
+ }
+ for (Cols c : data.keySet()) {
+ if (!table.containsColumn(c)) {
+ throw new IllegalArgumentException("Can't add data to " + c +
+ " because it doesn't exist in the table: " + table.getName());
+ }
+ }
+ insertStatement.addBatch();
+ } catch (SQLException e) {
+ LOG.warn("couldn't insert data for this row: {}", e.getMessage());
+ }
+ }
+
public static void updateInsertStatement(int dbColOffset, PreparedStatement st,
ColInfo colInfo, String value) throws SQLException {
if (value == null) {
@@ -178,9 +204,13 @@ public class JDBCUtil {
value = value.substring(0, colInfo.getPrecision());
LOG.warn("truncated varchar value in {} : {}", colInfo.getName(), value);
}
+ //postgres doesn't allow \0000
+ value = value.replaceAll("\u0000", " ");
st.setString(dbColOffset, value);
break;
case Types.CHAR:
+ //postgres doesn't allow \0000
+ value = value.replaceAll("\u0000", " ");
st.setString(dbColOffset, value);
break;
case Types.DOUBLE:
diff --git a/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java b/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
index 8aea3cd..909727a 100644
--- a/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
+++ b/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java
@@ -50,8 +50,8 @@ public class DBWriter implements IDBWriter {
private static final Logger LOG = LoggerFactory.getLogger(DBWriter.class);
private static final AtomicInteger WRITER_ID = new AtomicInteger();
- private final AtomicLong insertedRows = new AtomicLong();
- private final Long commitEveryX = 1000L;
+ private final Long commitEveryXRows = 10000L;
+ //private final Long commitEveryXMS = 60000L;
private final Connection conn;
private final JDBCUtil dbUtil;
@@ -60,7 +60,7 @@ public class DBWriter implements IDBWriter {
//<tableName, preparedStatement>
private final Map<String, PreparedStatement> inserts = new HashMap<>();
-
+ private final Map<String, LastInsert> lastInsertMap = new HashMap<>();
public DBWriter(Connection connection, List<TableInfo> tableInfos, JDBCUtil dbUtil, MimeBuffer mimeBuffer)
throws IOException, SQLException {
@@ -71,6 +71,7 @@ public class DBWriter implements IDBWriter {
try {
PreparedStatement st = createPreparedInsert(tableInfo);
inserts.put(tableInfo.getName(), st);
+ lastInsertMap.put(tableInfo.getName(), new LastInsert());
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -115,11 +116,19 @@ public class DBWriter implements IDBWriter {
throw new RuntimeException("Failed to create prepared statement for: "+
table.getName());
}
- dbUtil.insert(p, table, data);
- long rows = insertedRows.incrementAndGet();
- if (rows % commitEveryX == 0) {
- LOG.debug("writer ({}) is committing after {} rows", myId, rows);
+ dbUtil.batchInsert(p, table, data);
+ LastInsert lastInsert = lastInsertMap.get(table.getName());
+ lastInsert.rowCount++;
+ long elapsed = System.currentTimeMillis()-lastInsert.lastInsert;
+ if (
+ //elapsed > commitEveryXMS ||
+ lastInsert.rowCount % commitEveryXRows == 0) {
+ LOG.info("writer ({}) on table ({}) is committing after {} rows and {} ms", myId,
+ table.getName(),
+ lastInsert.rowCount, elapsed);
+ p.executeBatch();
conn.commit();
+ lastInsert.lastInsert = System.currentTimeMillis();
}
} catch (SQLException e) {
throw new IOException(e);
@@ -127,6 +136,13 @@ public class DBWriter implements IDBWriter {
}
public void close() throws IOException {
+ for (PreparedStatement p : inserts.values()) {
+ try {
+ p.executeBatch();
+ } catch (SQLException e) {
+ throw new IOExceptionWithCause(e);
+ }
+ }
try {
conn.commit();
} catch (SQLException e){
@@ -139,4 +155,9 @@ public class DBWriter implements IDBWriter {
}
}
+
+ private class LastInsert {
+ private long lastInsert = System.currentTimeMillis();
+ private long rowCount = 0;
+ }
}