You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2019/09/09 13:55:56 UTC
[camel] branch master updated: [CAMEL-13951] Implement
PostgresAggregationRepository to handle special PostgreSQL behavior
This is an automated email from the ASF dual-hosted git repository.
ggrzybek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 6974f9b [CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior
new b385b2c Merge pull request #3156 from grgrzybek/CAMEL-13951
6974f9b is described below
commit 6974f9b60a504eb967b5e643254c441040df7f9c
Author: Grzegorz Grzybek <gr...@gmail.com>
AuthorDate: Mon Sep 9 14:58:20 2019 +0200
[CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior
---
.../camel-sql/src/main/docs/sql-component.adoc | 48 +++++++++++-
.../aggregate/jdbc/JdbcAggregationRepository.java | 19 +++--
.../jdbc/PostgresAggregationRepository.java | 91 ++++++++++++++++++++++
3 files changed, 151 insertions(+), 7 deletions(-)
diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc
index f6866ea..99d027d 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -775,9 +775,9 @@ JDBC vendor.
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
- <propertyname="repositoryName" value="aggregation"/>
+ <property name="repositoryName" value="aggregation"/>
<property name="dataSource" ref="dataSource"/>
- <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
+ <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
</bean>
<!-- use the default mapper with extraFQN class names from our JDBC driver -->
<bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
@@ -790,6 +790,50 @@ class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
</bean>
----
+=== Propagation behavior
+
+`JdbcAggregationRepository` uses two distinct _transaction templates_ from Spring-TX. One is read-only
+and one is used for read-write operations.
+
+However, when using `JdbcAggregationRepository` within a route that itself uses `<transacted />` and there's
+common `PlatformTransactionManager` used, there may be a need to configure _propagation behavior_ used by
+transaction templates inside `JdbcAggregationRepository`.
+
+Here's a way to do it:
+[source,xml]
+----
+<bean id="repo"
+class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
+ <property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
+</bean>
+----
+
+Propagation is specified by constants of `org.springframework.transaction.TransactionDefinition` interface,
+so `propagationBehaviorName` is convenient setter that allows to use names of the constants.
+
+=== PostgreSQL case
+
+There's special database that may cause problems with optimistic locking used by `JdbcAggregationRepository`.
+PostgreSQL marks connection as invalid in case of data integrity violation exception (the one with SQLState 23505).
+This makes the connection effectively unusable within nested transaction.
+Details can be found
+https://www.postgresql.org/message-id/200609241203.59292.ralf.wiebicke%40exedio.com[in this document].
+
+`org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository` extends `JdbcAggregationRepository` and
+uses special `INSERT .. ON CONFLICT ..` statement to provide optimistic locking behavior.
+
+This statement is (with default aggregation table definition):
+[source,sql]
+----
+INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
+----
+
+Details can be found https://www.postgresql.org/docs/9.5/sql-insert.html[in PostgreSQL documentation].
+
+When this clause is used, `java.sql.PreparedStatement.executeUpdate()` call returns `0` instead of throwing
+SQLException with SQLState=23505. Further handling is exactly the same as with generic `JdbcAggregationRepository`,
+but without marking PostgreSQL connection as invalid.
+
== Camel Sql Starter
A starter module is available to spring-boot users. When using the starter,
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index b580a13..90a76f8 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -64,11 +64,11 @@ import org.springframework.util.FileCopyUtils;
*/
public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
+ protected static final String EXCHANGE = "exchange";
+ protected static final String ID = "id";
+ protected static final String BODY = "body";
private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
- private static final String ID = "id";
- private static final String EXCHANGE = "exchange";
- private static final String BODY = "body";
private static final Constants PROPAGATION_CONSTANTS = new Constants(TransactionDefinition.class);
private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
@@ -245,9 +245,9 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
}
- protected void insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception {
+ protected int insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception {
final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
- jdbcTemplate.execute(sql,
+ Integer updateCount = jdbcTemplate.execute(sql,
new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
@Override
protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
@@ -270,6 +270,7 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
}
}
});
+ return updateCount == null ? 0 : updateCount;
}
@Override
@@ -462,6 +463,10 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty();
}
+ public List<String> getHeadersToStoreAsText() {
+ return headersToStoreAsText;
+ }
+
/**
* Allows to store headers as String which is human readable. By default this option is disabled,
* storing the headers in binary format.
@@ -472,6 +477,10 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
this.headersToStoreAsText = headersToStoreAsText;
}
+ public boolean isStoreBodyAsText() {
+ return storeBodyAsText;
+ }
+
/**
* Whether to store the message body as String which is human readable.
* By default this option is false storing the body in binary format.
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
new file mode 100644
index 0000000..635a32a
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.transaction.PlatformTransactionManager;
+
+/**
+ * PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL Violation Exceptions
+ * using special {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
+ */
+public class PostgresAggregationRepository extends JdbcAggregationRepository {
+
+ /**
+ * Creates an aggregation repository
+ */
+ public PostgresAggregationRepository() {
+ }
+
+ /**
+ * Creates an aggregation repository with the three mandatory parameters
+ */
+ public PostgresAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName, DataSource dataSource) {
+ super(transactionManager, repositoryName, dataSource);
+ }
+
+ /**
+ * Inserts a new record into the given repository table
+ *
+ * @param camelContext the current CamelContext
+ * @param correlationId the correlation key
+ * @param exchange the aggregated exchange
+ * @param repositoryName The name of the table
+ */
+ protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName) throws Exception {
+ // The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
+ int totalParameterIndex = 2;
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("INSERT INTO ").append(repositoryName)
+ .append('(')
+ .append(EXCHANGE).append(", ")
+ .append(ID);
+
+ if (isStoreBodyAsText()) {
+ queryBuilder.append(", ").append(BODY);
+ totalParameterIndex++;
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : getHeadersToStoreAsText()) {
+ queryBuilder.append(", ").append(headerName);
+ totalParameterIndex++;
+ }
+ }
+
+ queryBuilder.append(") VALUES (");
+
+ for (int i = 0; i < totalParameterIndex - 1; i++) {
+ queryBuilder.append("?, ");
+ }
+ queryBuilder.append("?)");
+
+ queryBuilder.append(" ON CONFLICT DO NOTHING");
+
+ String sql = queryBuilder.toString();
+
+ int updateCount = insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
+ if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
+ throw new DataIntegrityViolationException("No row was inserted due to data violation");
+ }
+ }
+
+}