You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (Jira)" <ji...@apache.org> on 2021/06/24 07:20:00 UTC

[jira] [Assigned] (CAMEL-16750) Do not propagate exception when concurrent FILE component consumers try to acquire lock in JdbcMessageIdRepository

     [ https://issues.apache.org/jira/browse/CAMEL-16750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reassigned CAMEL-16750:
-----------------------------------

    Assignee: Claus Ibsen

> Do not propagate exception when concurrent FILE component consumers try to acquire lock in JdbcMessageIdRepository
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-16750
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16750
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-sql
>            Reporter: Miroslav Borský
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 3.12.0
>
>
> If there are concurrent consumers trying to acquire lock using the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository class, those not successful will receive an exception and propagate this exception as a warning to the log.
> Use case: Several nodes processing files from common network path / mount. Only one of them will acquire the lock and process the file. This is normal operation and as such it should either not produce any WARN messages or at least not produce them when readLockLoggingLevel=OFF
> Route to reproduce:
> {noformat}
> <?xml version="1.0" encoding="UTF-8"?>
> <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
>            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>            xmlns:xs="http://www.w3.org/2001/XMLSchema"
>            xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 https://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
> 	<property-placeholder xmlns="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" persistent-id="FileTransfer_Route" id="extProperties">
> 		<default-properties>
> 			<property name="src.dir" value="c:/Aplikace/src"/>
> 			<property name="arch.dir" value="c:/Aplikace/arch"/>
> 			<property name="pooling.delay" value="1s"/>
> 			<property name="readLock.checkInterval" value="1000"/>
> 			<property name="readLock.timeout" value="2000"/>
> 			<property name="readLock.release" value="60000"/>
> 			<property name="readLock.wipe" value="120000"/>
> 		</default-properties>
> 	</property-placeholder>
> 	<bean id="localHost" class="java.net.InetAddress" factory-method="getLocalHost"/>
> 	<bean id="hostName" class="java.lang.String" factory-ref="localHost" factory-method="getHostName"/>
> 	<reference id="dataSource" interface="javax.sql.DataSource" filter="(osgi.jndi.service.name=jdbc/etl)"/>
> 	<bean id="messageIdStore" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
> 		<argument ref="dataSource"/>
> 		<argument ref="hostName"/>
> 		<property name="createString" value="CREATE TABLE CAMEL_MESSAGEPROCESSED(processorName [varchar](255) NULL, messageId [varchar](100) NULL, createdAt datetime NULL, CONSTRAINT uMessageId UNIQUE(messageId))"/>
> 	</bean>
> 	<camelContext xmlns="http://camel.apache.org/schema/blueprint" id="FileTransfer_Route-ctx">
> 		<propertyPlaceholder id="properties" location="blueprint:extProperties"/>
> 		<route>
> 			<from uri="file:{{src.dir}}?move={{arch.dir}}/${header.CamelFileName}&amp;recursive=true&amp;autoCreate=false&amp;startingDirectoryMustExist=true&amp;delay={{pooling.delay}}&amp;greedy=true&amp;readLock=idempotent-changed&amp;readLockMinLength=0&amp;readLockCheckInterval={{readLock.checkInterval}}&amp;readLockTimeout={{readLock.timeout}}&amp;idempotentRepository=#messageIdStore&amp;readLockRemoveOnCommit=true&amp;readLockIdempotentReleaseAsync=true&amp;readLockIdempotentReleaseDelay={{readLock.release}}&amp;readLockLoggingLevel=OFF"/>
> 			<to uri="file:e/dst?tempFileName=${header.CamelFileNameOnly}.tmp"/>
> 		</route>
> 	</camelContext>
> </blueprint>
> {noformat}
> Exception stack:
> {noformat}
> 2021-06-22T16:37:42,382 | WARN  | Camel (FileTransfer_Route-ctx) thread #0 - file://c:/Aplikace/src | FileConsumer                     | 105 - org.apache.camel.camel-support - 3.10.0 | file://c:/Aplikace/src?autoCreate=false&delay=1s&greedy=true&idempotentRepository=%23messageIdStore&move=c%3A%2FAplikace%2Farch%2F%24%7Bheader.CamelFileName%7D&readLock=idempotent-changed&readLockCheckInterval=1000&readLockIdempotentReleaseAsync=true&readLockIdempotentReleaseDelay=60000&readLockLoggingLevel=OFF&readLockMinLength=0&readLockRemoveOnCommit=true&readLockTimeout=2000&recursive=true&startingDirectoryMustExist=true cannot begin processing file: GenericFile[c:\Aplikace\src\mysql-connect
> org.springframework.dao.DuplicateKeyException: PreparedStatementCallback; SQL [INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)]; Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).; nested exception is java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).
> 	at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:247) ~[!/:?]
> 	at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1541) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1025) ~[!/:?]
> 	at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository.insert(JdbcMessageIdRepository.java:118) ~[!/:3.10.0]
> 	at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:137) ~[!/:3.10.0]
> 	at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository$1.doInTransaction(AbstractJdbcMessageIdRepository.java:133) ~[!/:3.10.0]
> 	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[!/:?]
> 	at org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository.add(AbstractJdbcMessageIdRepository.java:133) ~[!/:3.10.0]
> 	at org.apache.camel.spi.IdempotentRepository.add(IdempotentRepository.java:95) ~[!/:3.10.0]
> 	at org.apache.camel.component.file.strategy.FileIdempotentChangedRepositoryReadLockStrategy.acquireExclusiveReadLock(FileIdempotentChangedRepositoryReadLockStrategy.java:89) ~[!/:3.10.0]
> 	at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.begin(GenericFileProcessStrategySupport.java:72) ~[!/:3.10.0]
> 	at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.begin(GenericFileRenameProcessStrategy.java:39) ~[!/:3.10.0]
> 	at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:389) [!/:3.10.0]
> 	at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:246) [!/:3.10.0]
> 	at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:207) [!/:3.10.0]
> 	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:190) [!/:3.10.0]
> 	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:107) [!/:3.10.0]
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_265]
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_265]
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_265]
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_265]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_265]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_265]
> 	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: java.sql.SQLException: Violation of UNIQUE KEY constraint 'uMessageId'. Cannot insert duplicate key in object 'dbo.CAMEL_MESSAGEPROCESSED'. The duplicate key value is (c:\Aplikace\src\file-currently-transfered-by-other-node).
> 	at net.sourceforge.jtds.jdbc.SQLDiagnostic.addDiagnostic(SQLDiagnostic.java:372) ~[!/:?]
> 	at net.sourceforge.jtds.jdbc.TdsCore.tdsErrorToken(TdsCore.java:2988) ~[!/:?]
> 	at net.sourceforge.jtds.jdbc.TdsCore.nextToken(TdsCore.java:2421) ~[!/:?]
> 	at net.sourceforge.jtds.jdbc.TdsCore.getMoreResults(TdsCore.java:671) ~[!/:?]
> 	at net.sourceforge.jtds.jdbc.JtdsStatement.processResults(JtdsStatement.java:613) ~[!/:?]
> 	at net.sourceforge.jtds.jdbc.JtdsStatement.executeSQL(JtdsStatement.java:572) ~[!/:?]
> 	at net.sourceforge.jtds.jdbc.JtdsPreparedStatement.executeUpdate(JtdsPreparedStatement.java:727) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965) ~[!/:?]
> 	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) ~[!/:?]
> 	... 24 more
> {noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)