You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gg...@apache.org on 2019/09/09 14:17:54 UTC

[camel] branch camel-2.24.x updated: [CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior

This is an automated email from the ASF dual-hosted git repository.

ggrzybek pushed a commit to branch camel-2.24.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.24.x by this push:
     new ab9746c  [CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior
ab9746c is described below

commit ab9746c1d3b0a2101f0d2d098212d26aacabb17d
Author: Grzegorz Grzybek <gr...@gmail.com>
AuthorDate: Mon Sep 9 14:58:20 2019 +0200

    [CAMEL-13951] Implement PostgresAggregationRepository to handle special PostgreSQL behavior
    
    (cherry picked from commit 6974f9b60a504eb967b5e643254c441040df7f9c)
    (cherry picked from commit 6f2348cf09a14ba4863e2364289a151da865fbcd)
---
 .../camel-sql/src/main/docs/sql-component.adoc     | 48 +++++++++++-
 .../aggregate/jdbc/JdbcAggregationRepository.java  | 19 +++--
 .../jdbc/PostgresAggregationRepository.java        | 91 ++++++++++++++++++++++
 3 files changed, 151 insertions(+), 7 deletions(-)

diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc
index 4a5c590..80ed04f 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -765,9 +765,9 @@ JDBC vendor.
 <bean id="repo"
 class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
   <property name="transactionManager" ref="transactionManager"/>
-  <propertyname="repositoryName" value="aggregation"/>
+  <property name="repositoryName" value="aggregation"/>
   <property name="dataSource" ref="dataSource"/>
-  <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
+  <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
 </bean>
 <!-- use the default mapper with extraFQN class names from our JDBC driver -->
 <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
@@ -780,6 +780,50 @@ class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
 </bean>
 -----
 
+=== Propagation behavior
+
+`JdbcAggregationRepository` uses two distinct _transaction templates_ from Spring-TX. One is read-only
+and one is used for read-write operations.
+
+However, when using `JdbcAggregationRepository` within a route that itself uses `<transacted />` and there's
+common `PlatformTransactionManager` used, there may be a need to configure _propagation behavior_ used by
+transaction templates inside `JdbcAggregationRepository`.
+
+Here's a way to do it:
+[source,xml]
+----
+<bean id="repo"
+class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
+  <property name="propagationBehaviorName" value="PROPAGATION_NESTED" />
+</bean>
+----
+
+Propagation is specified by constants of `org.springframework.transaction.TransactionDefinition` interface,
+so `propagationBehaviorName` is convenient setter that allows to use names of the constants.
+
+=== PostgreSQL case
+
+There's special database that may cause problems with optimistic locking used by `JdbcAggregationRepository`.
+PostgreSQL marks connection as invalid in case of data integrity violation exception (the one with SQLState 23505).
+This makes the connection effectively unusable within nested transaction.
+Details can be found
+https://www.postgresql.org/message-id/200609241203.59292.ralf.wiebicke%40exedio.com[in this document].
+
+`org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository` extends `JdbcAggregationRepository` and
+uses special `INSERT .. ON CONFLICT ..` statement to provide optimistic locking behavior.
+
+This statement is (with default aggregation table definition):
+[source,sql]
+----
+INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING
+----
+
+Details can be found https://www.postgresql.org/docs/9.5/sql-insert.html[in PostgreSQL documentation].
+
+When this clause is used, `java.sql.PreparedStatement.executeUpdate()` call returns `0` instead of throwing
+SQLException with SQLState=23505. Further handling is exactly the same as with generic `JdbcAggregationRepository`,
+but without marking PostgreSQL connection as invalid.
+
 === Camel Sql Starter
 
 A starter module is available to spring-boot users. When using the starter,
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 597767a..4e93389 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -59,11 +59,11 @@ import org.springframework.transaction.support.TransactionTemplate;
  */
 public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
 
+    protected static final String EXCHANGE = "exchange";
+    protected static final String ID = "id";
+    protected static final String BODY = "body";
 
     private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
-    private static final String ID = "id";
-    private static final String EXCHANGE = "exchange";
-    private static final String BODY = "body";
     private static final Constants PROPAGATION_CONSTANTS = new Constants(TransactionDefinition.class);
 
     private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
@@ -240,9 +240,9 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
         insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
     }
 
-    protected void insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception {
+    protected int insertAndUpdateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final boolean idComesFirst) throws Exception {
         final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders);
-        jdbcTemplate.execute(sql,
+        Integer updateCount = jdbcTemplate.execute(sql,
                 new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
                     @Override
                     protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException {
@@ -265,6 +265,7 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
                         }
                     }
                 });
+        return updateCount == null ? 0 : updateCount;
     }
 
     @Override
@@ -443,6 +444,10 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
         return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty();
     }
 
+    public List<String> getHeadersToStoreAsText() {
+        return headersToStoreAsText;
+    }
+
     /**
      * Allows to store headers as String which is human readable. By default this option is disabled,
      * storing the headers in binary format.
@@ -453,6 +458,10 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
         this.headersToStoreAsText = headersToStoreAsText;
     }
 
+    public boolean isStoreBodyAsText() {
+        return storeBodyAsText;
+    }
+
     /**
      * Whether to store the message body as String which is human readable.
      * By default this option is false storing the body in binary format.
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
new file mode 100644
index 0000000..f023432
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.transaction.PlatformTransactionManager;
+
+/**
+ * PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL Violation Exceptions
+ * using special {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
+ */
+public class PostgresAggregationRepository extends JdbcAggregationRepository {
+
+    /**
+     * Creates an aggregation repository
+     */
+    public PostgresAggregationRepository() {
+    }
+
+    /**
+     * Creates an aggregation repository with the three mandatory parameters
+     */
+    public PostgresAggregationRepository(PlatformTransactionManager transactionManager, String repositoryName, DataSource dataSource) {
+        super(transactionManager, repositoryName, dataSource);
+    }
+
+    /**
+     * Inserts a new record into the given repository table
+     *
+     * @param camelContext   the current CamelContext
+     * @param correlationId  the correlation key
+     * @param exchange       the aggregated exchange
+     * @param repositoryName The name of the table
+     */
+    protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName) throws Exception {
+        // The default totalParameterIndex is 2 for ID and Exchange. Depending on logic this will be increased
+        int totalParameterIndex = 2;
+        StringBuilder queryBuilder = new StringBuilder()
+                .append("INSERT INTO ").append(repositoryName)
+                .append('(')
+                .append(EXCHANGE).append(", ")
+                .append(ID);
+
+        if (isStoreBodyAsText()) {
+            queryBuilder.append(", ").append(BODY);
+            totalParameterIndex++;
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : getHeadersToStoreAsText()) {
+                queryBuilder.append(", ").append(headerName);
+                totalParameterIndex++;
+            }
+        }
+
+        queryBuilder.append(") VALUES (");
+
+        for (int i = 0; i < totalParameterIndex - 1; i++) {
+            queryBuilder.append("?, ");
+        }
+        queryBuilder.append("?)");
+
+        queryBuilder.append(" ON CONFLICT DO NOTHING");
+
+        String sql = queryBuilder.toString();
+
+        int updateCount = insertAndUpdateHelper(camelContext, correlationId, exchange, sql, true);
+        if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
+            throw new DataIntegrityViolationException("No row was inserted due to data violation");
+        }
+    }
+
+}