You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Ramarajan R (Jira)" <ji...@apache.org> on 2022/09/09 11:20:00 UTC

[jira] [Comment Edited] (CAMEL-18476) when artemis streaming enabled then Camel-file component is not able to delete the file after its archived in windows

    [ https://issues.apache.org/jira/browse/CAMEL-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602271#comment-17602271 ] 

Ramarajan R edited comment on CAMEL-18476 at 9/9/22 11:19 AM:
--------------------------------------------------------------

Hi [~davsclaus] 

In JmsBinding.class Input stream is not closed for bytes message type where as for type Stream input stream is closed.

 

            case Bytes: {
                BytesMessage message = session.createBytesMessage();
                if (body != null) {
                    try {
                        if (endpoint.isArtemisStreamingEnabled())

{                             LOG.trace("Optimised for Artemis: Streaming payload in BytesMessage");                             _InputStream is = context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, body);_                             _message.setObjectProperty("JMS_AMQ_InputStream", is);_                             _LOG.trace("Optimised for Artemis: Finished streaming payload in BytesMessage");_                         }

else

{                             byte[] payload = context.getTypeConverter().mandatoryConvertTo(byte[].class, exchange, body);                             message.writeBytes(payload);                         }

                    } catch (NoTypeConversionAvailableException e)

{                         // cannot convert to inputstream then thrown an exception to avoid sending a null message                         JMSException cause = new MessageFormatException(e.getMessage());                         cause.initCause(e);                         throw cause;                     }
                }
                return message;
            }
           

case Stream: {
                StreamMessage message = session.createStreamMessage();
                if (body != null) {
                    long size = 0;
                    InputStream is = null;
                    try {
                        is = context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, body);
                        LOG.trace("Writing payload in StreamMessage");
                        // assume streaming is bigger payload so use same buffer size as the file component
                        byte[] buffer = new byte[FileUtil.BUFFER_SIZE];
                        int len = 0;
                        int count = 0;
                        while (len >= 0) {
                            count++;
                            len = is.read(buffer);
                            if (len >= 0) {
                                size += len;
                                LOG.trace("Writing payload chunk {} as bytes in StreamMessage", count);
                                message.writeBytes(buffer, 0, len);
                            }
                        }
                        LOG.trace("Finished writing payload (size {}) as bytes in StreamMessage", size);
                    } catch (NoTypeConversionAvailableException | IOException e) \{                         // cannot convert to inputstream then thrown an exception to avoid sending a null message                         JMSException cause = new MessageFormatException(e.getMessage());                         cause.initCause(e);                         throw cause;                     }

finally

{                         *IOHelper.close(is);*                     }

                }
                return message;
            }


was (Author: ramarajan):
Hi [~davsclaus] 

In JmsBinding.class Input stream is not closed for bytes message type where as for type Stream input stream is closed.

 

            case Bytes: {
                BytesMessage message = session.createBytesMessage();
                if (body != null) {
                    try {
                        if (endpoint.isArtemisStreamingEnabled()) {
                            LOG.trace("Optimised for Artemis: Streaming payload in BytesMessage");
                            _InputStream is = context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, body);_
                            _message.setObjectProperty("JMS_AMQ_InputStream", is);_
                            _LOG.trace("Optimised for Artemis: Finished streaming payload in BytesMessage");_
                        } else {
                            byte[] payload = context.getTypeConverter().mandatoryConvertTo(byte[].class, exchange, body);
                            message.writeBytes(payload);
                        }
                    } catch (NoTypeConversionAvailableException e) {
                        // cannot convert to inputstream then thrown an exception to avoid sending a null message
                        JMSException cause = new MessageFormatException(e.getMessage());
                        cause.initCause(e);
                        throw cause;
                    }
                }
                return message;
            }
            case Map: {
                MapMessage message = session.createMapMessage();
                if (body != null) {
                    Map<?, ?> payload = context.getTypeConverter().convertTo(Map.class, exchange, body);
                    populateMapMessage(message, payload, context);
                }
                return message;
            }
            case Object: {
                ObjectMessage message = session.createObjectMessage();
                if (body != null) {
                    try {
                        Serializable payload
                                = context.getTypeConverter().mandatoryConvertTo(Serializable.class, exchange, body);
                        message.setObject(payload);
                    } catch (NoTypeConversionAvailableException e) {
                        // cannot convert to serializable then thrown an exception to avoid sending a null message
                        JMSException cause = new MessageFormatException(e.getMessage());
                        cause.initCause(e);
                        throw cause;
                    }
                }
                return message;
            }
            case Stream: {
                StreamMessage message = session.createStreamMessage();
                if (body != null) {
                    long size = 0;
                    InputStream is = null;
                    try {
                        is = context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, body);
                        LOG.trace("Writing payload in StreamMessage");
                        // assume streaming is bigger payload so use same buffer size as the file component
                        byte[] buffer = new byte[FileUtil.BUFFER_SIZE];
                        int len = 0;
                        int count = 0;
                        while (len >= 0) {
                            count++;
                            len = is.read(buffer);
                            if (len >= 0) {
                                size += len;
                                LOG.trace("Writing payload chunk {} as bytes in StreamMessage", count);
                                message.writeBytes(buffer, 0, len);
                            }
                        }
                        LOG.trace("Finished writing payload (size {}) as bytes in StreamMessage", size);
                    } catch (NoTypeConversionAvailableException | IOException e) {
                        // cannot convert to inputstream then thrown an exception to avoid sending a null message
                        JMSException cause = new MessageFormatException(e.getMessage());
                        cause.initCause(e);
                        throw cause;
                    } finally {
                        *IOHelper.close(is);*
                    }

                }
                return message;
            }

> when artemis streaming enabled then Camel-file component is not able to delete the file after its archived in windows
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-18476
>                 URL: https://issues.apache.org/jira/browse/CAMEL-18476
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-file
>    Affects Versions: 3.14.3, 3.18.1
>            Reporter: Ramarajan R
>            Priority: Minor
>         Attachments: artemis-large-messages-final.zip, artemis-large-messages.zip
>
>
> I have used [https://github.com/apache/camel-examples/tree/main/examples/artemis-large-messages] using AMQ 7.8.0 Broker 
> Messages are getting processed but after processing the file is unable to delete from source location, tested in more than 1 windows systems facing the same
> I am using Camel 3.14.3 and Spring boot 2.6.6 using JAVA DSL,
> tested with Camel 3.18.1 and Spring boot 2.7.3 also
> when running with Camel XML routes are fine but we want to use Java DSL , Kindly help.
> Raising the concern in Zulip chat earlier.
> {{[https://camel.zulipchat.com/#narrow/stream/257298-camel/topic/camel-file.20artemis.20large.20message.20example.20failing.20in.20windows] }}{{{}tion-poc/inbox0] o.a.c.c.file.GenericFileOnCompletion : Error during commit. Exchange[7D4CFE29E3F7515-0000000000000003]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Error renaming file from C:\opt\file-transfer-solution-poc\inbox0\test.xlsx to C:\opt\file-transfer-solution-poc\inbox0\.camel\test.xlsx]{}}}{{{}org.apache.camel.component.file.GenericFileOperationFailedException: Error renaming file from C:\opt\file-transfer-solution-poc\inbox0\test.xlsx to C:\opt\file-transfer-solution-poc\inbox0\.camel\test.xlsx{}}}
> {{at org.apache.camel.component.file.FileOperations.renameFile(FileOperations.java:93) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:145) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:121) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:134) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:86) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:60) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:104) ~[camel-support-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:93) ~[camel-support-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:238) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:61) ~[camel-support-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:777) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:712) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:263) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) ~[camel-api-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:193) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:398) ~[camel-base-engine-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202) ~[camel-support-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116) ~[camel-support-3.14.3.jar:3.14.3]}}
> {{at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]}}
> {{at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]}}
> {{at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]}}
> {{at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]}}
> {{at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]}}
> {{at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]}}
> {{Caused by: java.io.IOException: Renaming file from 'C:\opt\file-transfer-solution-poc\inbox0\test.xlsx' to 'C:\opt\file-transfer-solution-poc\inbox0\.camel\test.xlsx'}}
> {{failed: Cannot delete file 'C:\opt\file-transfer-solution-poc\inbox0\test.xlsx' after copy succeeded}}
> {{at org.apache.camel.util.FileUtil.renameFileUsingCopy(FileUtil.java:464) ~[camel-util-3.14.3.jar:3.14.3]}}
> {{at org.apache.camel.component.file.FileOperations.renameFile(FileOperations.java:88) ~[camel-file-3.14.3.jar:3.14.3]}}
> {{{}... 28 common frames omitted{}}}{{{}</code>{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)