You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2023/02/17 16:45:37 UTC
[tika] branch main updated: TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter (#971)
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 5e61a5809 TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter (#971)
5e61a5809 is described below
commit 5e61a58096e9898a7b862bf623e68ba77206d341
Author: Tim Allison <ta...@apache.org>
AuthorDate: Fri Feb 17 11:45:32 2023 -0500
TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter (#971)
---
.../tika/pipes/emitter/jdbc/JDBCEmitter.java | 157 ++++++++++++++-------
.../pipes/reporters/jdbc/JDBCPipesReporter.java | 92 +++++++++---
2 files changed, 174 insertions(+), 75 deletions(-)
diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index 968cbb4ad..e51351430 100644
--- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -28,11 +28,13 @@ import java.sql.Types;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -50,6 +52,7 @@ import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
+import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.utils.StringUtils;
@@ -75,11 +78,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
FIRST_ONLY, CONCATENATE
//anything else?
}
+
//some file formats do not have time zones...
//try both
- private static final String[] TIKA_DATE_PATTERNS = new String[] {
- "yyyy-MM-dd'T'HH:mm:ss'Z'","yyyy-MM-dd'T'HH:mm:ss"
- };
+ private static final String[] TIKA_DATE_PATTERNS =
+ new String[]{"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"};
//the "write" lock is used for creating the table
private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
//this keeps track of which table + connection string have been created
@@ -88,9 +91,14 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
//different jdbc emitters.
private static Set<String> TABLES_CREATED = new HashSet<>();
private String connectionString;
+
+ private Optional<String> postConnectionString = Optional.empty();
private String insert;
private String createTable;
private String alterTable;
+
+ private int maxRetries = 0;
+
private Map<String, String> keys;
private Connection connection;
private PreparedStatement insertStatement;
@@ -112,6 +120,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
dateFormats[i++] = new SimpleDateFormat(p, Locale.US);
}
}
+
/**
* This is called immediately after the table is created.
* The purpose of this is to allow for adding a complex primary key or
@@ -134,8 +143,25 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
}
@Field
- public void setConnection(String connectionString) {
- this.connectionString = connectionString;
+ public void setConnection(String connection) {
+ this.connectionString = connection;
+ }
+
+ public void setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ }
+
+ /**
+ * This sql will be called immediately after the connection is made. This was
+ * initially added for setting pragmas on sqlite3, but may be used for other
+ * connection configuration in other dbs. Note: This is called before the table is
+ * created if it needs to be created.
+ *
+ * @param postConnection
+ */
+ @Field
+ public void setPostConnection(String postConnection) {
+ this.postConnectionString = Optional.of(postConnection);
}
/**
@@ -143,7 +169,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
* a multivalued field in a metadata object, do you want the first value only
* or should we concatenate these with the
* {@link JDBCEmitter#setMultivaluedFieldDelimiter(String)}.
- *
+ * <p>
* The default values as of 2.6.1 are {@link MultivaluedFieldStrategy#CONCATENATE}
* and the default delimiter is ", "
*
@@ -169,7 +195,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
/**
* See {@link JDBCEmitter#setMultivaluedFieldDelimiter(String)}
- * @param delimiter
+ *
+ * @param delimiter
*/
@Field
public void setMultivaluedFieldDelimiter(String delimiter) {
@@ -209,7 +236,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
* This executes the emit with each call. For more efficient
* batch execution use {@link #emit(List)}.
*
- * @param emitKey emit key
+ * @param emitKey emit key
* @param metadataList list of metadata per file
* @throws IOException
* @throws TikaEmitterException
@@ -220,52 +247,43 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
if (metadataList == null || metadataList.size() < 1) {
return;
}
- try {
- if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
- insertFirstOnly(emitKey, metadataList);
- insertStatement.execute();
- } else {
- insertAll(emitKey, metadataList);
- insertStatement.executeBatch();
- }
- } catch (SQLException e) {
- try {
- LOGGER.warn("problem during emit; going to try to reconnect", e);
- //something went wrong
- //try to reconnect
- reconnect();
- } catch (SQLException ex) {
- throw new TikaEmitterException("Couldn't reconnect!", ex);
- }
- throw new TikaEmitterException("couldn't emit", e);
- }
+ List<EmitData> emitDataList = new ArrayList<>();
+ emitDataList.add(new EmitData(new EmitKey("", emitKey), metadataList));
+ emit(emitDataList);
}
@Override
public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException {
- try {
- if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
- for (EmitData d : emitData) {
- insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList());
- insertStatement.addBatch();
- }
- } else {
- for (EmitData d : emitData) {
- insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList());
+ int tries = 0;
+ Exception ex = null;
+ while (tries++ <= maxRetries) {
+ try {
+ emitNow(emitData);
+ return;
+ } catch (SQLException e) {
+ try {
+ reconnect();
+ } catch (SQLException exc) {
+ throw new TikaEmitterException("couldn't reconnect!", exc);
}
+ ex = e;
}
- insertStatement.executeBatch();
- } catch (SQLException e) {
- try {
- LOGGER.warn("problem during emit; going to try to reconnect", e);
- //something went wrong
- //try to reconnect
- reconnect();
- } catch (SQLException ex) {
- throw new TikaEmitterException("Couldn't reconnect!", ex);
+ }
+ throw new TikaEmitterException("Couldn't emit " + emitData.size() + " records.", ex);
+ }
+
+ private void emitNow(List<? extends EmitData> emitData) throws SQLException {
+ if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) {
+ for (EmitData d : emitData) {
+ insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList());
+ insertStatement.addBatch();
+ }
+ } else {
+ for (EmitData d : emitData) {
+ insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList());
}
- throw new TikaEmitterException("couldn't emit", e);
}
+ insertStatement.executeBatch();
}
private void insertAll(String emitKey, List<Metadata> metadataList) throws SQLException {
@@ -299,7 +317,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
SQLException ex = null;
for (int i = 0; i < 3; i++) {
try {
- connection = DriverManager.getConnection(connectionString);
+ tryClose();
+ createConnection();
insertStatement = connection.prepareStatement(insert);
return;
} catch (SQLException e) {
@@ -310,6 +329,33 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
throw ex;
}
+ private void tryClose() {
+ if (insertStatement != null) {
+ try {
+ insertStatement.close();
+ } catch (SQLException e) {
+ LOGGER.warn("exception closing insert", e);
+ }
+ }
+
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOGGER.warn("exception closing connection", e);
+ }
+ }
+ }
+
+ private void createConnection() throws SQLException {
+ connection = DriverManager.getConnection(connectionString);
+ if (postConnectionString.isPresent()) {
+ try (Statement st = connection.createStatement()) {
+ st.execute(postConnectionString.get());
+ }
+ }
+ }
+
private void updateValue(PreparedStatement insertStatement, int i, String key, String type,
int metadataListIndex, List<Metadata> metadataList)
throws SQLException {
@@ -352,7 +398,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
}
private String getVal(Metadata metadata, String key, String type) {
- if (! type.equals("string") && ! type.startsWith("varchar")) {
+ if (!type.equals("string") && !type.startsWith("varchar")) {
return metadata.get(key);
}
if (multivaluedFieldStrategy == MultivaluedFieldStrategy.FIRST_ONLY) {
@@ -380,7 +426,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
return sb.toString();
}
- private void updateDouble(PreparedStatement insertStatement, int i, String val) throws SQLException {
+ private void updateDouble(PreparedStatement insertStatement, int i, String val)
+ throws SQLException {
if (StringUtils.isBlank(val)) {
insertStatement.setNull(i, Types.DOUBLE);
return;
@@ -390,8 +437,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
}
private void updateVarchar(String key, String type, PreparedStatement insertStatement, int i,
- String val)
- throws SQLException {
+ String val) throws SQLException {
if (StringUtils.isBlank(val)) {
updateString(insertStatement, i, val);
return;
@@ -480,7 +526,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
public void initialize(Map<String, Param> params) throws TikaConfigException {
try {
- connection = DriverManager.getConnection(connectionString);
+ createConnection();
} catch (SQLException e) {
throw new TikaConfigException("couldn't open connection: " + connectionString, e);
}
@@ -509,8 +555,6 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
} catch (SQLException e) {
throw new TikaConfigException("can't create insert statement", e);
}
-
-
}
@Override
@@ -524,6 +568,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close
*/
@Override
public void close() throws IOException {
+ try {
+ insertStatement.close();
+ } catch (SQLException e) {
+ LOGGER.warn("problem closing insert", e);
+ }
try {
connection.close();
} catch (SQLException e) {
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index 835b50ec1..febb9cc26 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -25,6 +25,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -60,6 +61,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
private static final long MAX_WAIT_MILLIS = 120000;
private String connectionString;
+
+ private Optional<String> postConnectionString = Optional.empty();
private final ArrayBlockingQueue<KeyStatusPair> queue =
new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
CompletableFuture<Void> reportWorkerFuture;
@@ -70,7 +73,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
if (StringUtils.isBlank(connectionString)) {
throw new TikaConfigException("Must specify a connectionString");
}
- ReportWorker reportWorker = new ReportWorker(connectionString, queue);
+ ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString, queue);
reportWorker.init();
reportWorkerFuture = CompletableFuture.runAsync(reportWorker);
}
@@ -87,6 +90,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
this.connectionString = connection;
}
+ /**
+ * This sql will be called immediately after the connection is made. This was
+ * initially added for setting pragmas on sqlite3, but may be used for other
+ * connection configuration in other dbs. Note: This is called before the table is
+ * created if it needs to be created.
+ *
+ * @param postConnection
+ */
+ @Field
+ public void setPostConnection(String postConnection) {
+ this.postConnectionString = Optional.of(postConnection);
+ }
@Override
public void report(FetchEmitTuple t, PipesResult result, long elapsed) {
@@ -155,13 +170,17 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
private static final int MAX_TRIES = 3;
private final String connectionString;
+ private final Optional<String> postConnectionString;
private final ArrayBlockingQueue<KeyStatusPair> queue;
List<KeyStatusPair> cache = new ArrayList<>();
private Connection connection;
private PreparedStatement insert;
- public ReportWorker(String connectionString, ArrayBlockingQueue<KeyStatusPair> queue) {
+ public ReportWorker(String connectionString,
+ Optional<String> postConnectionString,
+ ArrayBlockingQueue<KeyStatusPair> queue) {
this.connectionString = connectionString;
+ this.postConnectionString = postConnectionString;
this.queue = queue;
}
@@ -186,23 +205,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
return;
}
if (p == KeyStatusPair.END_SEMAPHORE) {
- LOG.trace("received end semaphore");
- try {
- reportNow();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- return;
- }
- LOG.trace("about to close");
- try {
- insert.close();
- connection.close();
- LOG.trace("successfully closed resources");
- } catch (SQLException e) {
- LOG.warn("problem shutting down reporter", e);
- }
-
+ shutdownNow();
return;
}
cache.add(p);
@@ -219,6 +222,29 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
}
+ private void shutdownNow() {
+ LOG.trace("received end semaphore");
+ try {
+ reportNow();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ return;
+ }
+ LOG.trace("about to close");
+ try {
+ insert.close();
+ } catch (SQLException e) {
+ LOG.warn("problem shutting down insert statement in reporter", e);
+ }
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("problem shutting down connection in reporter", e);
+ }
+ LOG.trace("successfully closed resources");
+ }
+
private void reportNow() throws SQLException, InterruptedException {
int attempt = 0;
while (++attempt < MAX_TRIES) {
@@ -254,22 +280,46 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl
SQLException ex = null;
while (++attempts < 3) {
try {
+ tryClose();
createConnection();
createPreparedStatement();
+ LOG.debug("success reconnecting after {} attempts", attempts);
return;
} catch (SQLException e) {
LOG.warn("problem reconnecting", e);
- //if there's a failure, wait 10 seconds
+ //if there's a failure, wait 30 seconds
//and hope the db is back up.
- Thread.sleep(10000);
+ Thread.sleep(30000);
ex = e;
}
}
throw ex;
}
+ private void tryClose() {
+ if (insert != null) {
+ try {
+ insert.close();
+ } catch (SQLException e) {
+ LOG.warn("exception closing insert statement", insert);
+ }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("exception closing connection", e);
+ }
+ }
+ }
+
private void createConnection() throws SQLException {
connection = DriverManager.getConnection(connectionString);
+ if (postConnectionString.isPresent()) {
+ try (Statement st = connection.createStatement()) {
+ st.execute(postConnectionString.get());
+ }
+ }
}
private void createPreparedStatement() throws SQLException {