You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Mirek Borsky <ge...@email.cz> on 2021/06/22 14:36:58 UTC

FILE component with concurrent consumers and JDBC idempotent store producing "Violation of UNIQUE KEY" warnings

Greetings,

I am trying to implement concurrent file consumers using jdbc as idempotent store, with MSSQL DB.
It is working, but when the file is locked by other nodes, the route is producing WARN messages to my log, even though the readLockLoggingLevel is set to OFF.
Could someone kindly point out, am I doing something wrong?

I have reproduced this in karaf 4.3.2 and Camel 3.10.0

The route is:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:xs="http://www.w3.org/2001/XMLSchema"
           xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
	<property-placeholder xmlns="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" persistent-id="FileTransfer_Route" id="extProperties">
		<default-properties>
			<property name="src.dir" value="c:/Aplikace/src"/>
			<property name="arch.dir" value="c:/Aplikace/arch"/>
		</default-properties>
	</property-placeholder>
	<bean id="localHost" class="java.net.InetAddress" factory-method="getLocalHost"/>
	<bean id="hostName" class="java.lang.String" factory-ref="localHost" factory-method="getHostName"/>
	<reference id="dataSource" interface="javax.sql.DataSource" filter="(osgi.jndi.service.name=jdbc/etl)"/>
	<bean id="messageIdStore" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
		<argument ref="dataSource"/>
		<argument ref="hostName"/>
		<property name="createString" value="CREATE TABLE CAMEL_MESSAGEPROCESSED(processorName varchar(255) NULL, messageId varchar(100) NULL, createdAt datetime NULL, CONSTRAINT uMessageId UNIQUE(messageId))"/>
	</bean>
	<camelContext xmlns="http://camel.apache.org/schema/blueprint" id="FileTransfer_Route-ctx">
		<propertyPlaceholder id="properties" location="blueprint:extProperties"/>
		<route>
			<from uri="file:{{src.dir}}?move={{arch.dir}}/${header.CamelFileName}&amp;recursive=true&amp;autoCreate=false&amp;startingDirectoryMustExist=true&amp;delay=1s&amp;greedy=true&amp;readLock=idempotent-changed&amp;readLockMinLength=0&amp;readLockCheckInterval=1000&amp;readLockTimeout=2000&amp;idempotentRepository=#messageIdStore&amp;readLockRemoveOnCommit=true&amp;readLockIdempotentReleaseAsync=true&amp;readLockIdempotentReleaseDelay=60000&amp;readLockLoggingLevel=OFF"/>

			<to uri="file:e/dst?tempFileName=${header.CamelFileNameOnly}.tmp"/>
		</route>
	</camelContext>
</blueprint>


The error message is:
16:18:09.709 WARN [Camel (FileTransfer_Route-ctx) thread #0 - file://c:/Aplikace/src] file://c:/Aplikace/src?autoCreate=false&delay=1s&greedy=true&idempotentRepository=%23messageIdStore&move=c%3A%2FAplikace%2Farch%2F%24%7Bheader.CamelFileName%7D&readLock=idempotent-changed&readLockCheckInterval=1000&readLockIdempotentReleaseAsync=true&readLockIdempotentReleaseDelay=60000&readLockLoggingLevel=OFF&readLockMinLength=0&readLockRemoveOnCommit=true&readLockTimeout=2000&recursive=true&startingDirectoryMustExist=true cannot begin processing file: GenericFile[c:\Aplikace\src\file-currently-transfered-by-other-node] due to: PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).. Caused by: [org.springframework.dao.DuplicateKeyException - PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).]
org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).
        at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247) ~[!/:?]
        at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1541) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1025) ~[!/:?]
        at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository.insert(JdbcMessageIdRepository.java:118) ~[!/:3.10.0]
        at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:137) ~[!/:3.10.0]
        at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:133) ~[!/:3.10.0]
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[!/:?]
        at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository.add(AbstractJdbcMessageIdRepository.java:133) ~[!/:3.10.0]
        at org.apache.camel.spi.IdempotentRepository.add(IdempotentRepository.java:95) ~[!/:3.10.0]
        at org.apache.camel.component.file.strategy.FileIdempotentChangedRepositoryReadLockStrategy.acquireExclusiveReadLock(FileIdempotentChangedRepositoryReadLockStrategy.java:89) ~[!/:3.10.0]
        at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.begin(GenericFileProcessStrategySupport.java:72) ~[!/:3.10.0]
        at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.begin(GenericFileRenameProcessStrategy.java:39) ~[!/:3.10.0]
        at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:389) [!/:3.10.0]
        at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:246) [!/:3.10.0]
        at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:207) [!/:3.10.0]
        at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190) [!/:3.10.0]
        at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) [!/:3.10.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_265]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_265]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_265]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_265]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_265]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_265]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).
        at net.sourceforge.jtds.jdbc.SQLDiagnostic.addDiagnostic(SQLDiagnostic.java:372) ~[!/:?]
        at net.sourceforge.jtds.jdbc.TdsCore.tdsErrorToken(TdsCore.java:2988) ~[!/:?]
        at net.sourceforge.jtds.jdbc.TdsCore.nextToken(TdsCore.java:2421) ~[!/:?]
        at net.sourceforge.jtds.jdbc.TdsCore.getMoreResults(TdsCore.java:671) ~[!/:?]
        at net.sourceforge.jtds.jdbc.JtdsStatement.processResults(JtdsStatement.java:613) ~[!/:?]
        at net.sourceforge.jtds.jdbc.JtdsStatement.executeSQL(JtdsStatement.java:572) ~[!/:?]
        at net.sourceforge.jtds.jdbc.JtdsPreparedStatement.executeUpdate(JtdsPreparedStatement.java:727) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965) ~[!/:?]
        at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) ~[!/:?]
        ... 24 more


I will appreciate any pointers how to prevent those log messages, as they are a part of normal operations.

kind regards
Mirek Borský


Re: FILE component with concurrent consumers and JDBC idempotent store producing "Violation of UNIQUE KEY" warnings

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Ah yeah we can likely improve this out of the box by making that add
operation deal with exceptions and regard it as the lock was not
acquired.
You are welcome to create a JIRA

If you want a workaround you can just extend that idempotent repo
class and add your own try .. catch in that add method to ignore the
exception.

On Tue, Jun 22, 2021 at 4:37 PM Mirek Borsky <ge...@email.cz> wrote:
>
> Greetings,
>
> I am trying to implement concurrent file consumers using jdbc as idempotent store, with MSSQL DB.
> It is working, but when the file is locked by other nodes, the route is producing WARN messages to my log, even though the readLockLoggingLevel is set to OFF.
> Could someone kindly point out, am I doing something wrong?
>
> I have reproduced this in karaf 4.3.2 and Camel 3.10.0
>
> The route is:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
>            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>            xmlns:xs="http://www.w3.org/2001/XMLSchema"
>            xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
>         <property-placeholder xmlns="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" persistent-id="FileTransfer_Route" id="extProperties">
>                 <default-properties>
>                         <property name="src.dir" value="c:/Aplikace/src"/>
>                         <property name="arch.dir" value="c:/Aplikace/arch"/>
>                 </default-properties>
>         </property-placeholder>
>         <bean id="localHost" class="java.net.InetAddress" factory-method="getLocalHost"/>
>         <bean id="hostName" class="java.lang.String" factory-ref="localHost" factory-method="getHostName"/>
>         <reference id="dataSource" interface="javax.sql.DataSource" filter="(osgi.jndi.service.name=jdbc/etl)"/>
>         <bean id="messageIdStore" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
>                 <argument ref="dataSource"/>
>                 <argument ref="hostName"/>
>                 <property name="createString" value="CREATE TABLE CAMEL_MESSAGEPROCESSED(processorName varchar(255) NULL, messageId varchar(100) NULL, createdAt datetime NULL, CONSTRAINT uMessageId UNIQUE(messageId))"/>
>         </bean>
>         <camelContext xmlns="http://camel.apache.org/schema/blueprint" id="FileTransfer_Route-ctx">
>                 <propertyPlaceholder id="properties" location="blueprint:extProperties"/>
>                 <route>
>                         <from uri="file:{{src.dir}}?move={{arch.dir}}/${header.CamelFileName}&amp;recursive=true&amp;autoCreate=false&amp;startingDirectoryMustExist=true&amp;delay=1s&amp;greedy=true&amp;readLock=idempotent-changed&amp;readLockMinLength=0&amp;readLockCheckInterval=1000&amp;readLockTimeout=2000&amp;idempotentRepository=#messageIdStore&amp;readLockRemoveOnCommit=true&amp;readLockIdempotentReleaseAsync=true&amp;readLockIdempotentReleaseDelay=60000&amp;readLockLoggingLevel=OFF"/>
>
>                         <to uri="file:e/dst?tempFileName=${header.CamelFileNameOnly}.tmp"/>
>                 </route>
>         </camelContext>
> </blueprint>
>
>
> The error message is:
> 16:18:09.709 WARN [Camel (FileTransfer_Route-ctx) thread #0 - file://c:/Aplikace/src] file://c:/Aplikace/src?autoCreate=false&delay=1s&greedy=true&idempotentRepository=%23messageIdStore&move=c%3A%2FAplikace%2Farch%2F%24%7Bheader.CamelFileName%7D&readLock=idempotent-changed&readLockCheckInterval=1000&readLockIdempotentReleaseAsync=true&readLockIdempotentReleaseDelay=60000&readLockLoggingLevel=OFF&readLockMinLength=0&readLockRemoveOnCommit=true&readLockTimeout=2000&recursive=true&startingDirectoryMustExist=true cannot begin processing file: GenericFile[c:\Aplikace\src\file-currently-transfered-by-other-node] due to: PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).. Caused by: [org.springframework.dao.DuplicateKeyException - PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).]
> org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).
>         at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247) ~[!/:?]
>         at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1541) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1025) ~[!/:?]
>         at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository.insert(JdbcMessageIdRepository.java:118) ~[!/:3.10.0]
>         at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:137) ~[!/:3.10.0]
>         at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:133) ~[!/:3.10.0]
>         at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[!/:?]
>         at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository.add(AbstractJdbcMessageIdRepository.java:133) ~[!/:3.10.0]
>         at org.apache.camel.spi.IdempotentRepository.add(IdempotentRepository.java:95) ~[!/:3.10.0]
>         at org.apache.camel.component.file.strategy.FileIdempotentChangedRepositoryReadLockStrategy.acquireExclusiveReadLock(FileIdempotentChangedRepositoryReadLockStrategy.java:89) ~[!/:3.10.0]
>         at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.begin(GenericFileProcessStrategySupport.java:72) ~[!/:3.10.0]
>         at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.begin(GenericFileRenameProcessStrategy.java:39) ~[!/:3.10.0]
>         at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:389) [!/:3.10.0]
>         at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:246) [!/:3.10.0]
>         at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:207) [!/:3.10.0]
>         at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190) [!/:3.10.0]
>         at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) [!/:3.10.0]
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_265]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_265]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_265]
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_265]
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_265]
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_265]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).
>         at net.sourceforge.jtds.jdbc.SQLDiagnostic.addDiagnostic(SQLDiagnostic.java:372) ~[!/:?]
>         at net.sourceforge.jtds.jdbc.TdsCore.tdsErrorToken(TdsCore.java:2988) ~[!/:?]
>         at net.sourceforge.jtds.jdbc.TdsCore.nextToken(TdsCore.java:2421) ~[!/:?]
>         at net.sourceforge.jtds.jdbc.TdsCore.getMoreResults(TdsCore.java:671) ~[!/:?]
>         at net.sourceforge.jtds.jdbc.JtdsStatement.processResults(JtdsStatement.java:613) ~[!/:?]
>         at net.sourceforge.jtds.jdbc.JtdsStatement.executeSQL(JtdsStatement.java:572) ~[!/:?]
>         at net.sourceforge.jtds.jdbc.JtdsPreparedStatement.executeUpdate(JtdsPreparedStatement.java:727) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965) ~[!/:?]
>         at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) ~[!/:?]
>         ... 24 more
>
>
> I will appreciate any pointers how to prevent those log messages, as they are a part of normal operations.
>
> kind regards
> Mirek Borský
>


-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2