You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/09/06 12:09:53 UTC
[tika] branch main updated: TIKA-3846 -- improve jdbc emitter to do batch updates when emit is called with a list of emit data
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new deeec7772 TIKA-3846 -- improve jdbc emitter to do batch updates when emit is called with a list of emit data
deeec7772 is described below
commit deeec77722f28b98560a26e062cb28fa1a9f65ea
Author: tballison <ta...@apache.org>
AuthorDate: Tue Sep 6 08:09:43 2022 -0400
TIKA-3846 -- improve jdbc emitter to do batch updates when emit is called with a list of emit data
---
.../tika/pipes/emitter/jdbc/JDBCEmitter.java | 100 ++++++++++++---------
1 file changed, 59 insertions(+), 41 deletions(-)
diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index 274ae5d6c..14965f345 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -41,6 +41,7 @@ import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
+import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.utils.StringUtils;
@@ -52,15 +53,9 @@ import org.apache.tika.utils.StringUtils;
*/
public class JDBCEmitter extends AbstractEmitter implements Initializable, Closeable {
- public enum AttachmentStrategy {
- FIRST_ONLY, ALL
- //anything else?
- }
-
private static final Logger LOGGER = LoggerFactory.getLogger(JDBCEmitter.class);
//the "write" lock is used for creating the table
private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
-
//this keeps track of which table + connection string have been created
//so that only one table is created per table + connection string.
//This is necessary for testing and if someone specifies multiple
@@ -73,10 +68,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
private Map<String, String> keys;
private Connection connection;
private PreparedStatement insertStatement;
-
private AttachmentStrategy attachmentStrategy = AttachmentStrategy.FIRST_ONLY;
-
/**
* This is called immediately after the table is created.
* The purpose of this is to allow for adding a complex primary key or
@@ -132,32 +125,29 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
}
}
+ /**
+ * This executes the emit with each call. For more efficient
+ * batch execution use {@link #emit(List)}.
+ *
+ * @param emitKey emit key
+ * @param metadataList list of metadata per file
+ * @throws IOException
+ * @throws TikaEmitterException
+ */
@Override
public void emit(String emitKey, List<Metadata> metadataList)
throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() < 1) {
return;
}
- if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
- emitFirstOnly(emitKey, metadataList);
- } else {
- emitAll(emitKey, metadataList);
- }
- }
-
- private void emitAll(String emitKey, List<Metadata> metadataList) throws TikaEmitterException {
try {
- for (int i = 0; i < metadataList.size(); i++) {
- insertStatement.clearParameters();
- int col = 0;
- insertStatement.setString(++col, emitKey);
- insertStatement.setInt(++col, i);
- for (Map.Entry<String, String> e : keys.entrySet()) {
- updateValue(insertStatement, ++col, e.getKey(), e.getValue(), i, metadataList);
- }
- insertStatement.addBatch();
+ if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+ insertFirstOnly(emitKey, metadataList);
+ insertStatement.execute();
+ } else {
+ insertAll(emitKey, metadataList);
+ insertStatement.executeBatch();
}
- insertStatement.executeBatch();
} catch (SQLException e) {
try {
LOGGER.warn("problem during emit; going to try to reconnect", e);
@@ -169,20 +159,22 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
}
throw new TikaEmitterException("couldn't emit", e);
}
-
}
- private void emitFirstOnly(String emitKey, List<Metadata> metadataList)
- throws TikaEmitterException {
-
+ @Override
+ public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
try {
- insertStatement.clearParameters();
- int i = 0;
- insertStatement.setString(++i, emitKey);
- for (Map.Entry<String, String> e : keys.entrySet()) {
- updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
+ if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+ for (EmitData d : emitData) {
+ insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList());
+ insertStatement.addBatch();
+ }
+ } else {
+ for (EmitData d : emitData) {
+ insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList());
+ }
}
- insertStatement.execute();
+ insertStatement.executeBatch();
} catch (SQLException e) {
try {
LOGGER.warn("problem during emit; going to try to reconnect", e);
@@ -194,7 +186,28 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
}
throw new TikaEmitterException("couldn't emit", e);
}
+ }
+
+ private void insertAll(String emitKey, List<Metadata> metadataList) throws SQLException {
+ for (int i = 0; i < metadataList.size(); i++) {
+ insertStatement.clearParameters();
+ int col = 0;
+ insertStatement.setString(++col, emitKey);
+ insertStatement.setInt(++col, i);
+ for (Map.Entry<String, String> e : keys.entrySet()) {
+ updateValue(insertStatement, ++col, e.getKey(), e.getValue(), i, metadataList);
+ }
+ insertStatement.addBatch();
+ }
+ }
+ private void insertFirstOnly(String emitKey, List<Metadata> metadataList) throws SQLException {
+ insertStatement.clearParameters();
+ int i = 0;
+ insertStatement.setString(++i, emitKey);
+ for (Map.Entry<String, String> e : keys.entrySet()) {
+ updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList);
+ }
}
private void reconnect() throws SQLException {
@@ -330,11 +343,6 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
//require
}
- /*
- TODO: This is currently not ever called. We need rework the PipesParser
- to ensure that emitters are closed cleanly.
- */
-
/**
* @throws IOException
*/
@@ -346,4 +354,14 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
throw new IOException(e);
}
}
+
+ /*
+ TODO: This is currently not ever called. We need rework the PipesParser
+ to ensure that emitters are closed cleanly.
+ */
+
+ public enum AttachmentStrategy {
+ FIRST_ONLY, ALL
+ //anything else?
+ }
}