You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Hiram Chirino <hi...@hiramchirino.com> on 2007/08/22 21:58:18 UTC

Asynchronous Exchange Processing

Hi,

Most of our components currently depend on synchronous processing of
the Exchange or bad things can happen.  For example the following does
not work:

from("file:/tmp/foo").to("seda:test");
from("seda:test").process( myProcessor );

Why? because the file component delete the file as soon as the
exchange returns from being sent to seda:test.  What would have been
nice is that file deletion did not occur until after the exchange is
processed by myProcessor.  But that's occuring in an asynchronous
thread.

Here's an idea that might help solve this problem.
Have the seda component call something like
   exchange.getExchangeFuture().done()
when the message is processed in it's async thread.

and in the file component, have it call
   exchange.getExchangeFuture().get();
   // then the code that deletes the file
or
   exchange.getExchangeFuture().setCallback( new Callback() {
     public void done( Exchange exch ) {
        // then the code that deletes the file
     }
})

It's just a simple stab at a possible solution.  I got a feeling that
it's going to get more complicated since now we are forcing components
to be aware of the async processing model and we tend to copy
exchanges and do processing in pipelines etc. etc.  But I'm hoping to
get the conversation started on this topic.  What do you guys think??
is there a simpler way to solve this?

Bellow you will find a simple patch that implements what the
exchange.getExchangeFuture() method might look like.

Index: camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java	(revision
0)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java	(revision
0)
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.Exchange;
+
+/**
+ * The DefaultExchangeFuture allows Exchanges to processed
asynchronously by allowing
+ * the producer and consumer of the exchange to signal each other so that it
+ * is known when processing has completed.
+ *
+ * For processors participating on asynchronous exchanges, once an
exchange has been
+ * processed, the DefaultExchangeFuture.done() method should be
called to let the exchange
+ * creator know that the exchange has been processed.
+ *
+ * For exchange creators (these are typically the Component
Consumers), if the exchange is being
+ * processes async, it should either wait for the exchange to get
completed asynchronously using
+ * one of the get() methods or it should register a Callback using
the setCallback() method.  Once
+ * the exchange is done, then he can destroy the originating event.
+ *
+ */
+public class DefaultExchangeFuture implements Future<Exchange> {
+
+    private final Exchange exchange;
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private boolean done;
+    private boolean canceled;
+    private Callback callback;
+
+    public static interface Callback {
+        void done(Exchange exchange);
+    }
+
+
+    /**
+     * @param exchange
+     */
+    public DefaultExchangeFuture(Exchange exchange) {
+        this.exchange = exchange;
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        boolean rc = false;
+        Callback c = null;
+
+        synchronized (this) {
+            if (!done && !canceled) {
+                c = callback;
+                canceled = true;
+                latch.countDown();
+                rc = true;
+            }
+        }
+
+        if (rc) {
+            latch.countDown();
+            if (c != null) {
+                c.done(exchange);
+            }
+        }
+        return rc;
+    }
+
+    public Exchange get() throws InterruptedException, ExecutionException {
+        latch.await();
+        synchronized (this) {
+            if (canceled) {
+                throw new CancellationException();
+            }
+            // TODO: We might want to do this..
+            // if (exception != null) {
+            // throw new ExecutionException(exception);
+            // }
+            return exchange;
+        }
+    }
+
+    public Exchange get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
+        latch.await(timeout, unit);
+        synchronized (this) {
+            if (canceled) {
+                throw new CancellationException();
+            }
+            // TODO: We might want to do this..
+            // if (exception != null) {
+            // throw new ExecutionException(exception);
+            // }
+            return exchange;
+        }
+    }
+
+    public synchronized boolean isCancelled() {
+        return canceled;
+    }
+
+    public synchronized boolean isDone() {
+        return done || canceled;
+    }
+
+    public synchronized Callback getCallback() {
+        return callback;
+    }
+
+    /**
+     * Registers a callback handler with the future if the future is not yet
+     * completed.
+     *
+     * @param callback
+     * @return false if the callback could not get registered due to the future
+     *         being done.
+     */
+    public synchronized boolean setCallback(Callback callback) {
+        if (isDone()) {
+            return false;
+        }
+        this.callback = callback;
+        return true;
+    }
+
+    public void done() throws CancellationException, IllegalStateException {
+        Callback c = null;
+        synchronized (this) {
+            if (canceled) {
+                throw new CancellationException();
+            }
+            if (done) {
+                throw new IllegalStateException("Exchange is allready done");
+            }
+            done = true;
+            c = callback;
+        }
+        latch.countDown();
+        if (c != null) {
+            c.done(exchange);
+        }
+    }
+}

Property changes on:
camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Index: camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java	(revision
568699)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java	(working
copy)
@@ -38,7 +38,8 @@
     private Message fault;
     private Throwable exception;
     private String exchangeId =
DefaultExchange.DEFAULT_ID_GENERATOR.generateId();
-
+    private final DefaultExchangeFuture exchangeFuture = new
DefaultExchangeFuture(this);
+
     public DefaultExchange(CamelContext context) {
         this.context = context;
     }
@@ -222,4 +223,9 @@
             messageSupport.setExchange(this);
         }
     }
+
+
+    DefaultExchangeFuture getExchangeFuture() {
+        return exchangeFuture;
+    }
 }


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: Asynchronous Exchange Processing

Posted by Nodet Guillaume <gn...@gmail.com>.
On Aug 22, 2007, at 9:58 PM, Hiram Chirino wrote:

> Hi,
>
> Most of our components currently depend on synchronous processing of
> the Exchange or bad things can happen.  For example the following does
> not work:
>
> from("file:/tmp/foo").to("seda:test");
> from("seda:test").process( myProcessor );
>
> Why? because the file component delete the file as soon as the
> exchange returns from being sent to seda:test.  What would have been
> nice is that file deletion did not occur until after the exchange is
> processed by myProcessor.  But that's occuring in an asynchronous
> thread.
>
> Here's an idea that might help solve this problem.
> Have the seda component call something like
>    exchange.getExchangeFuture().done()
> when the message is processed in it's async thread.
>
> and in the file component, have it call
>    exchange.getExchangeFuture().get();
>    // then the code that deletes the file
> or
>    exchange.getExchangeFuture().setCallback( new Callback() {
>      public void done( Exchange exch ) {
>         // then the code that deletes the file
>      }
> })
>
> It's just a simple stab at a possible solution.  I got a feeling that
> it's going to get more complicated since now we are forcing components
> to be aware of the async processing model and we tend to copy
> exchanges and do processing in pipelines etc. etc.  But I'm hoping to
> get the conversation started on this topic.  What do you guys think??
> is there a simpler way to solve this?

The problem of the future / callback is that it can handle  
asynchronous calls,
but this is a stateful  api.  This is a limitation that we must be  
aware of.
An example of a stateless asynchronous api would be more like jbi or  
servicemix.
But this obviously leads to a more complicated api.

JBI solves this problem by sending back the exchange with a DONE status
when it has been processed.  For files, another way would be to send  
a stream
wrapper and act when the stream is closed.

Cheers,
Guillaume Nodet

>
> Bellow you will find a simple patch that implements what the
> exchange.getExchangeFuture() method might look like.
>
> Index: camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java
> ===================================================================
> --- camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java	(revision
> 0)
> +++ camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java	(revision
> 0)
> @@ -0,0 +1,158 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file  
> distributed with
> + * this work for additional information regarding copyright  
> ownership.
> + * The ASF licenses this file to You under the Apache License,  
> Version 2.0
> + * (the "License"); you may not use this file except in compliance  
> with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,  
> software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or  
> implied.
> + * See the License for the specific language governing permissions  
> and
> + * limitations under the License.
> + */
> +package org.apache.camel.impl;
> +
> +import java.util.concurrent.CancellationException;
> +import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.ExecutionException;
> +import java.util.concurrent.Future;
> +import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.TimeoutException;
> +
> +import org.apache.camel.Exchange;
> +
> +/**
> + * The DefaultExchangeFuture allows Exchanges to processed
> asynchronously by allowing
> + * the producer and consumer of the exchange to signal each other  
> so that it
> + * is known when processing has completed.
> + *
> + * For processors participating on asynchronous exchanges, once an
> exchange has been
> + * processed, the DefaultExchangeFuture.done() method should be
> called to let the exchange
> + * creator know that the exchange has been processed.
> + *
> + * For exchange creators (these are typically the Component
> Consumers), if the exchange is being
> + * processes async, it should either wait for the exchange to get
> completed asynchronously using
> + * one of the get() methods or it should register a Callback using
> the setCallback() method.  Once
> + * the exchange is done, then he can destroy the originating event.
> + *
> + */
> +public class DefaultExchangeFuture implements Future<Exchange> {
> +
> +    private final Exchange exchange;
> +    private final CountDownLatch latch = new CountDownLatch(1);
> +    private boolean done;
> +    private boolean canceled;
> +    private Callback callback;
> +
> +    public static interface Callback {
> +        void done(Exchange exchange);
> +    }
> +
> +
> +    /**
> +     * @param exchange
> +     */
> +    public DefaultExchangeFuture(Exchange exchange) {
> +        this.exchange = exchange;
> +    }
> +
> +    public boolean cancel(boolean mayInterruptIfRunning) {
> +        boolean rc = false;
> +        Callback c = null;
> +
> +        synchronized (this) {
> +            if (!done && !canceled) {
> +                c = callback;
> +                canceled = true;
> +                latch.countDown();
> +                rc = true;
> +            }
> +        }
> +
> +        if (rc) {
> +            latch.countDown();
> +            if (c != null) {
> +                c.done(exchange);
> +            }
> +        }
> +        return rc;
> +    }
> +
> +    public Exchange get() throws InterruptedException,  
> ExecutionException {
> +        latch.await();
> +        synchronized (this) {
> +            if (canceled) {
> +                throw new CancellationException();
> +            }
> +            // TODO: We might want to do this..
> +            // if (exception != null) {
> +            // throw new ExecutionException(exception);
> +            // }
> +            return exchange;
> +        }
> +    }
> +
> +    public Exchange get(long timeout, TimeUnit unit) throws
> InterruptedException, ExecutionException, TimeoutException {
> +        latch.await(timeout, unit);
> +        synchronized (this) {
> +            if (canceled) {
> +                throw new CancellationException();
> +            }
> +            // TODO: We might want to do this..
> +            // if (exception != null) {
> +            // throw new ExecutionException(exception);
> +            // }
> +            return exchange;
> +        }
> +    }
> +
> +    public synchronized boolean isCancelled() {
> +        return canceled;
> +    }
> +
> +    public synchronized boolean isDone() {
> +        return done || canceled;
> +    }
> +
> +    public synchronized Callback getCallback() {
> +        return callback;
> +    }
> +
> +    /**
> +     * Registers a callback handler with the future if the future  
> is not yet
> +     * completed.
> +     *
> +     * @param callback
> +     * @return false if the callback could not get registered due  
> to the future
> +     *         being done.
> +     */
> +    public synchronized boolean setCallback(Callback callback) {
> +        if (isDone()) {
> +            return false;
> +        }
> +        this.callback = callback;
> +        return true;
> +    }
> +
> +    public void done() throws CancellationException,  
> IllegalStateException {
> +        Callback c = null;
> +        synchronized (this) {
> +            if (canceled) {
> +                throw new CancellationException();
> +            }
> +            if (done) {
> +                throw new IllegalStateException("Exchange is  
> allready done");
> +            }
> +            done = true;
> +            c = callback;
> +        }
> +        latch.countDown();
> +        if (c != null) {
> +            c.done(exchange);
> +        }
> +    }
> +}
>
> Property changes on:
> camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java
> ___________________________________________________________________
> Name: svn:keywords
>    + Rev Date
> Name: svn:eol-style
>    + native
>
> Index: camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchange.java
> ===================================================================
> --- camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchange.java	(revision
> 568699)
> +++ camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchange.java	(working
> copy)
> @@ -38,7 +38,8 @@
>      private Message fault;
>      private Throwable exception;
>      private String exchangeId =
> DefaultExchange.DEFAULT_ID_GENERATOR.generateId();
> -
> +    private final DefaultExchangeFuture exchangeFuture = new
> DefaultExchangeFuture(this);
> +
>      public DefaultExchange(CamelContext context) {
>          this.context = context;
>      }
> @@ -222,4 +223,9 @@
>              messageSupport.setExchange(this);
>          }
>      }
> +
> +
> +    DefaultExchangeFuture getExchangeFuture() {
> +        return exchangeFuture;
> +    }
>  }
>
>
> -- 
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com


Re: Asynchronous Exchange Processing

Posted by Guillaume Nodet <gn...@gmail.com>.
I somewhat agree with that.I think it would simplify the API to have

exchange.onCommit(new Runnable() {
  public void run() {
    // do whatever
  }
};

or

exchange.onCommit(new Callback<Exchange>() {
  public void handle(Exchange exchange) {
    // do whatever
  }
};

rather than

exchange.getUnitOfWork().addSynchronization(new Synchronization() {
  public void onComplete(Exchange exchange) {
    // do whatever
  }
  public void onFailure(Exchange exchange) {
  }
});

But this is no big deal.
The main point is to have it and have the whole core to support
asynchronous
processors.  I'll try to work on that soon.

On 8/29/07, Brian McCallister <br...@skife.org> wrote:
>
> I would consider just saying exchange.on[Commit|Fail] at the user level.
>
> The reason here is that the Exchange represents... an exchange
> between endpoints. it *is* the unit of work. Implying that there is a
> separate transaction it is operating in muddies the waters.
>
> -Brian
>
> On Aug 29, 2007, at 1:12 AM, James Strachan wrote:
>
> > Incidentally as a first stab at tackling CAMEL-123
> > https://issues.apache.org/activemq/browse/CAMEL-123
> >
> > I've added a UnitOfWork object on an Exchange that can be used to
> > register onComplete/onFailure hooks when processing of a message
> > exchange has been completed. Its not yet wired into the sync/async
> > changes Hiram made though but gives an idea of what I was wondering
> > about.
> >
> > If you take a look at the DefaultUnitOfWork.addAsyncStep() - I was
> > kinda wondering if we could use the UnitOfWork object as a factory of
> > AsyncCallback objects when working with async stuff; so that by
> > default when all the async callbacks complete, the UnitOfWork's
> > Synchronization handlers could be invoked (e.g. the onComplete /
> > onFailure handlers). Then my next thought was could we move a bit more
> > of the code from the changes to Pipeline out into the UnitOfWork
> > class; still a bit embyonic my thoughts though - its complex stuff
> > this sync v async v transactions stuff :)
> >
> > --
> > James
> > -------
> > http://macstrac.blogspot.com/
>
>


-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/

Re: Asynchronous Exchange Processing

Posted by Brian McCallister <br...@skife.org>.
I would consider just saying exchange.on[Commit|Fail] at the user level.

The reason here is that the Exchange represents... an exchange  
between endpoints. it *is* the unit of work. Implying that there is a  
separate transaction it is operating in muddies the waters.

-Brian

On Aug 29, 2007, at 1:12 AM, James Strachan wrote:

> Incidentally as a first stab at tackling CAMEL-123
> https://issues.apache.org/activemq/browse/CAMEL-123
>
> I've added a UnitOfWork object on an Exchange that can be used to
> register onComplete/onFailure hooks when processing of a message
> exchange has been completed. Its not yet wired into the sync/async
> changes Hiram made though but gives an idea of what I was wondering
> about.
>
> If you take a look at the DefaultUnitOfWork.addAsyncStep() - I was
> kinda wondering if we could use the UnitOfWork object as a factory of
> AsyncCallback objects when working with async stuff; so that by
> default when all the async callbacks complete, the UnitOfWork's
> Synchronization handlers could be invoked (e.g. the onComplete /
> onFailure handlers). Then my next thought was could we move a bit more
> of the code from the changes to Pipeline out into the UnitOfWork
> class; still a bit embyonic my thoughts though - its complex stuff
> this sync v async v transactions stuff :)
>
> -- 
> James
> -------
> http://macstrac.blogspot.com/


Re: Asynchronous Exchange Processing

Posted by James Strachan <ja...@gmail.com>.
Incidentally as a first stab at tackling CAMEL-123
https://issues.apache.org/activemq/browse/CAMEL-123

I've added a UnitOfWork object on an Exchange that can be used to
register onComplete/onFailure hooks when processing of a message
exchange has been completed. Its not yet wired into the sync/async
changes Hiram made though but gives an idea of what I was wondering
about.

If you take a look at the DefaultUnitOfWork.addAsyncStep() - I was
kinda wondering if we could use the UnitOfWork object as a factory of
AsyncCallback objects when working with async stuff; so that by
default when all the async callbacks complete, the UnitOfWork's
Synchronization handlers could be invoked (e.g. the onComplete /
onFailure handlers). Then my next thought was could we move a bit more
of the code from the changes to Pipeline out into the UnitOfWork
class; still a bit embyonic my thoughts though - its complex stuff
this sync v async v transactions stuff :)

-- 
James
-------
http://macstrac.blogspot.com/

Re: Asynchronous Exchange Processing

Posted by Hiram Chirino <hi...@hiramchirino.com>.
will do.

On 8/28/07, James Strachan <ja...@gmail.com> wrote:
> BTW am thinking - this patch doesn't affect too much yet - mostly the
> seda & file components along with the Pipeline - so how about we
> commit it; then we can experiment with different ways to improve it as
> we also try fix CAMEL-123?
>
>
>
> On 8/28/07, James Strachan <ja...@gmail.com> wrote:
> > On 8/28/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > > Got those test failures fixed now..  Here is a better version of the patch.
> >
> > Great stuff!
> >
> > I've been thinking we need a way to register onComplete / onFail
> > hooks. (Rather like TransactionSynchronization in Spring)...
> > http://static.springframework.org/spring/docs/2.0.x/api/org/springframework/transaction/support/TransactionSynchronization.html
> >
> > So allowing something like
> >
> > exchange.getTransaction().addSynchronization(new Synchronization() {
> >   pubic void onCommit() {
> >      //  remove the file...
> >   }
> >
> >   public void onRollback() {
> >      ///
> >   }
> > }
> >
> > https://issues.apache.org/activemq/browse/CAMEL-123
> >
> > If we had some kinda Transaction object, which was propogated across
> > any copied exchanges (e.g. a new exchange for each async operation or
> > when using multicast etc), then we'd have a place we could register
> > these kinds of onCommit/onRollback handlers. Then each component in
> > the pipeline - whether file or ftp or whatever, could add their own
> > onCommit/onRollback handlers etc.
> >
> > If we had this single Transaction object which is properly propogated,
> > maybe that could also take over some of the work doing the tracking of
> > the number of async steps per transaction as well as being the
> > AsyncCallback - to contain the count down latch and so forth. So
> > moving some of the code from Pipeline into this single place, this
> > Transaction object - which hopefully could make it a bit easier to
> > handle async processors in some of the other processors with minimal
> > code etc.
> >
> > Am just wondering if we can minimise the amount of work required in
> > the pipeline/processor code to support async handling.
> >
> > James
> > -------
> > http://macstrac.blogspot.com/
> >
>
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: Asynchronous Exchange Processing

Posted by James Strachan <ja...@gmail.com>.
BTW am thinking - this patch doesn't affect too much yet - mostly the
seda & file components along with the Pipeline - so how about we
commit it; then we can experiment with different ways to improve it as
we also try fix CAMEL-123?



On 8/28/07, James Strachan <ja...@gmail.com> wrote:
> On 8/28/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > Got those test failures fixed now..  Here is a better version of the patch.
>
> Great stuff!
>
> I've been thinking we need a way to register onComplete / onFail
> hooks. (Rather like TransactionSynchronization in Spring)...
> http://static.springframework.org/spring/docs/2.0.x/api/org/springframework/transaction/support/TransactionSynchronization.html
>
> So allowing something like
>
> exchange.getTransaction().addSynchronization(new Synchronization() {
>   pubic void onCommit() {
>      //  remove the file...
>   }
>
>   public void onRollback() {
>      ///
>   }
> }
>
> https://issues.apache.org/activemq/browse/CAMEL-123
>
> If we had some kinda Transaction object, which was propogated across
> any copied exchanges (e.g. a new exchange for each async operation or
> when using multicast etc), then we'd have a place we could register
> these kinds of onCommit/onRollback handlers. Then each component in
> the pipeline - whether file or ftp or whatever, could add their own
> onCommit/onRollback handlers etc.
>
> If we had this single Transaction object which is properly propogated,
> maybe that could also take over some of the work doing the tracking of
> the number of async steps per transaction as well as being the
> AsyncCallback - to contain the count down latch and so forth. So
> moving some of the code from Pipeline into this single place, this
> Transaction object - which hopefully could make it a bit easier to
> handle async processors in some of the other processors with minimal
> code etc.
>
> Am just wondering if we can minimise the amount of work required in
> the pipeline/processor code to support async handling.
>
> James
> -------
> http://macstrac.blogspot.com/
>


-- 
James
-------
http://macstrac.blogspot.com/

Re: Asynchronous Exchange Processing

Posted by James Strachan <ja...@gmail.com>.
On 8/28/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> Got those test failures fixed now..  Here is a better version of the patch.

Great stuff!

I've been thinking we need a way to register onComplete / onFail
hooks. (Rather like TransactionSynchronization in Spring)...
http://static.springframework.org/spring/docs/2.0.x/api/org/springframework/transaction/support/TransactionSynchronization.html

So allowing something like

exchange.getTransaction().addSynchronization(new Synchronization() {
  pubic void onCommit() {
     //  remove the file...
  }

  public void onRollback() {
     ///
  }
}

https://issues.apache.org/activemq/browse/CAMEL-123

If we had some kinda Transaction object, which was propogated across
any copied exchanges (e.g. a new exchange for each async operation or
when using multicast etc), then we'd have a place we could register
these kinds of onCommit/onRollback handlers. Then each component in
the pipeline - whether file or ftp or whatever, could add their own
onCommit/onRollback handlers etc.

If we had this single Transaction object which is properly propogated,
maybe that could also take over some of the work doing the tracking of
the number of async steps per transaction as well as being the
AsyncCallback - to contain the count down latch and so forth. So
moving some of the code from Pipeline into this single place, this
Transaction object - which hopefully could make it a bit easier to
handle async processors in some of the other processors with minimal
code etc.

Am just wondering if we can minimise the amount of work required in
the pipeline/processor code to support async handling.

James
-------
http://macstrac.blogspot.com/

Re: Asynchronous Exchange Processing

Posted by Hiram Chirino <hi...@hiramchirino.com>.
Got those test failures fixed now..  Here is a better version of the patch.

On 8/28/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> Hey..
>
> so I prototype an initial implementation of this.  I'm going to attach
> that patch for review and comments.  It's still kinda of rough and I
> broke a few other test cases.  I'll work on fixing that later today
> but this does solve the async processing from a file:// endpoint
> problem that this thread originally described.  Just run the included
> FileAsyncRouteTest case.
>
> Regards,
> Hiram
>
> On 8/23/07, James Strachan <ja...@gmail.com> wrote:
> > On 8/23/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > > On 8/23/07, James Strachan <ja...@gmail.com> wrote:
> > > > On 8/22/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > > > > Hi,
> > > > >
> > > > > Most of our components currently depend on synchronous processing of
> > > > > the Exchange or bad things can happen.  For example the following does
> > > > > not work:
> > > > >
> > > > > from("file:/tmp/foo").to("seda:test");
> > > > > from("seda:test").process( myProcessor );
> > > > >
> > > > > Why? because the file component delete the file as soon as the
> > > > > exchange returns from being sent to seda:test.  What would have been
> > > > > nice is that file deletion did not occur until after the exchange is
> > > > > processed by myProcessor.  But that's occuring in an asynchronous
> > > > > thread.
> > > > >
> > > > > Here's an idea that might help solve this problem.
> > > > > Have the seda component call something like
> > > > >    exchange.getExchangeFuture().done()
> > > > > when the message is processed in it's async thread.
> > > > >
> > > > > and in the file component, have it call
> > > > >    exchange.getExchangeFuture().get();
> > > > >    // then the code that deletes the file
> > > > > or
> > > > >    exchange.getExchangeFuture().setCallback( new Callback() {
> > > > >      public void done( Exchange exch ) {
> > > > >         // then the code that deletes the file
> > > > >      }
> > > > > })
> > > >
> > > > I was pondering about this with relation to this thread the other day...
> > > > http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
> > > >
> > > > I definitely think we need a standard way to register
> > > > post-commit/rollback hooks. i.e. on completion of processing (either
> > > > on a commit/completed or rollback/failed) allow a
> > > > processor/consumer/producer to register some logic such as to delete a
> > > > file, flush some cache etc. Note this is mostly required for
> > > > non-transactional things. e.g. in JPA and JMS we can just use
> > > > transactions for this.
> > >
> > > Actually transaction things are easy since they require all processing
> > > in the transaction to be done synchronously.
> >
> > You could suspend the transaction and resume it in another thread; its
> > rarely done but it is a possible approach.
> >
> >
> > > The hard bit is
> > > processing the exchanges async.
> >
> > Yeah - async is hard full stop I think :)
> >
> >
> > > > I'm kinda wondering; should we just try make things like files, FTP
> > > > and the like transactional; that is to say, we implement transaction
> > > > hooks so that we can do a file 'delete/rename' which is registered as
> > > > a transaction commit status listener? Just registering some kind of
> > > > onCommit/onRollback callbacks would do the trick though as you
> > > > suggest.
> > > >
> > >
> > > I don't like the idea of making this looks like transaction semantics
> > > when it's not.  Traditional transaction semantics force you to do
> > > processing synchronously.  And the point of this is exactly the
> > > opposite.
> >
> > I could counter that by saying I don't like 2 different mechanisms to
> > describe a 'unit of work' with callback mechanisms for knowing when it
> > completes successfully or fails. i.e. having transaction callbacks and
> > async callbacks; we should have just one really.
> >
> > e.g. using synchronous processing, I might want to process a file and
> > do a JDBC insert; only if the transaction commits do I want to delete
> > the file.
> >
> > So I think being able to have file operations work nicely with
> > transactions (whether in sync or async mode) is a pretty common
> > requirement.
> >
> > The issue though is; should we treat async processing as
> > suspending/resuming a transaction or not (as async processing of
> > transactions is tricky).
> >
> >
> > > > There's a second issue which is asynchronous processing; such as a
> > > > producer invoking an asynchronous processor then wanting some kind of
> > > > callback that the processing has completed. I wanted to make the easy
> > > > things really easy with Camel; so was a bit reluctant to add
> > > > asynchronous processing explicitly from the start for fear of making
> > > > the API very complex; most components afterall tend to be synchronous
> > > > (which makes transactions very easy to do too btw).
> > > >
> > >
> > > I agree with this..  and this is my greatest fear.  We need to make
> > > sure that the synchronous components stay as simple as they are today.
> > >  But allow async aware components support having their exchanges be
> > > processed async.
> >
> > Yeah. I think there's gonna be few of 'em that are truly async too btw
> > - so only a few component ninja's will have to worry about that.
> >
> >
> > > > I was thinking we could add some optional API for AsyncProcessor which
> > > > is-a Processor but adds an asynchronous invocation API style; rather
> > > > like the Channel does in the ServiceMix 4 API...
> > > >
> > > > // sync API
> > > > interface Processor {
> > > >         void    process(Exchange exchange);
> > > > }
> > > >
> > > > interface AsyncProcessor extends Processor {
> > > >   // async methods
> > > >   Future<Exchange>      processAsync(Exchange exchange)
> > > >   Future<Exchange>      processsync(Exchange exchange, AsyncHandler handler)
> > > > }
> > > >
> > > > Then rather than adding a kinda done() method to the Exchange and
> > > > calling it throughout every single producer/consumer/Processor
> > > > implementation; we could just use the Future object to know when a
> > > > particular asynchronous operation has completed. i.e. keep the async
> > > > API to the side, for those rare cases folks really wanna use it -
> > > > otherwise we can all stick to the simple sync API that works easily
> > > > with transactions.
> > > >
> > >
> > > This might be a good option.  I think that we don't need the
> > > "Future<Exchange>      processAsync(Exchange exchange)" call since to
> > > make an exchange async you just need to route it through a seda:
> > > component.
> > >
> > > so perhaps we just add:
> > > Future<Exchange>      processs(Exchange exchange, AsyncHandler handler)
> > >
> > > If the path of the exchange is sync, the it's a blocking call and by
> > > the time it returns the Future will be done.  But it reached an async
> > > component like seda: then it will return without the Future being
> > > completed.
> > >
> > > > Thoughts?
> > >
> > > Sounds like like a good approach...  Perhaps I'll prototype it..
> >
> > Go for it! :)
> >
> > --
> > James
> > -------
> > http://macstrac.blogspot.com/
> >
>
>
> --
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com
>
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: Asynchronous Exchange Processing

Posted by Hiram Chirino <hi...@hiramchirino.com>.
Hey..

so I prototype an initial implementation of this.  I'm going to attach
that patch for review and comments.  It's still kinda of rough and I
broke a few other test cases.  I'll work on fixing that later today
but this does solve the async processing from a file:// endpoint
problem that this thread originally described.  Just run the included
FileAsyncRouteTest case.

Regards,
Hiram

On 8/23/07, James Strachan <ja...@gmail.com> wrote:
> On 8/23/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > On 8/23/07, James Strachan <ja...@gmail.com> wrote:
> > > On 8/22/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > > > Hi,
> > > >
> > > > Most of our components currently depend on synchronous processing of
> > > > the Exchange or bad things can happen.  For example the following does
> > > > not work:
> > > >
> > > > from("file:/tmp/foo").to("seda:test");
> > > > from("seda:test").process( myProcessor );
> > > >
> > > > Why? because the file component delete the file as soon as the
> > > > exchange returns from being sent to seda:test.  What would have been
> > > > nice is that file deletion did not occur until after the exchange is
> > > > processed by myProcessor.  But that's occuring in an asynchronous
> > > > thread.
> > > >
> > > > Here's an idea that might help solve this problem.
> > > > Have the seda component call something like
> > > >    exchange.getExchangeFuture().done()
> > > > when the message is processed in it's async thread.
> > > >
> > > > and in the file component, have it call
> > > >    exchange.getExchangeFuture().get();
> > > >    // then the code that deletes the file
> > > > or
> > > >    exchange.getExchangeFuture().setCallback( new Callback() {
> > > >      public void done( Exchange exch ) {
> > > >         // then the code that deletes the file
> > > >      }
> > > > })
> > >
> > > I was pondering about this with relation to this thread the other day...
> > > http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
> > >
> > > I definitely think we need a standard way to register
> > > post-commit/rollback hooks. i.e. on completion of processing (either
> > > on a commit/completed or rollback/failed) allow a
> > > processor/consumer/producer to register some logic such as to delete a
> > > file, flush some cache etc. Note this is mostly required for
> > > non-transactional things. e.g. in JPA and JMS we can just use
> > > transactions for this.
> >
> > Actually transaction things are easy since they require all processing
> > in the transaction to be done synchronously.
>
> You could suspend the transaction and resume it in another thread; its
> rarely done but it is a possible approach.
>
>
> > The hard bit is
> > processing the exchanges async.
>
> Yeah - async is hard full stop I think :)
>
>
> > > I'm kinda wondering; should we just try make things like files, FTP
> > > and the like transactional; that is to say, we implement transaction
> > > hooks so that we can do a file 'delete/rename' which is registered as
> > > a transaction commit status listener? Just registering some kind of
> > > onCommit/onRollback callbacks would do the trick though as you
> > > suggest.
> > >
> >
> > I don't like the idea of making this looks like transaction semantics
> > when it's not.  Traditional transaction semantics force you to do
> > processing synchronously.  And the point of this is exactly the
> > opposite.
>
> I could counter that by saying I don't like 2 different mechanisms to
> describe a 'unit of work' with callback mechanisms for knowing when it
> completes successfully or fails. i.e. having transaction callbacks and
> async callbacks; we should have just one really.
>
> e.g. using synchronous processing, I might want to process a file and
> do a JDBC insert; only if the transaction commits do I want to delete
> the file.
>
> So I think being able to have file operations work nicely with
> transactions (whether in sync or async mode) is a pretty common
> requirement.
>
> The issue though is; should we treat async processing as
> suspending/resuming a transaction or not (as async processing of
> transactions is tricky).
>
>
> > > There's a second issue which is asynchronous processing; such as a
> > > producer invoking an asynchronous processor then wanting some kind of
> > > callback that the processing has completed. I wanted to make the easy
> > > things really easy with Camel; so was a bit reluctant to add
> > > asynchronous processing explicitly from the start for fear of making
> > > the API very complex; most components afterall tend to be synchronous
> > > (which makes transactions very easy to do too btw).
> > >
> >
> > I agree with this..  and this is my greatest fear.  We need to make
> > sure that the synchronous components stay as simple as they are today.
> >  But allow async aware components support having their exchanges be
> > processed async.
>
> Yeah. I think there's gonna be few of 'em that are truly async too btw
> - so only a few component ninja's will have to worry about that.
>
>
> > > I was thinking we could add some optional API for AsyncProcessor which
> > > is-a Processor but adds an asynchronous invocation API style; rather
> > > like the Channel does in the ServiceMix 4 API...
> > >
> > > // sync API
> > > interface Processor {
> > >         void    process(Exchange exchange);
> > > }
> > >
> > > interface AsyncProcessor extends Processor {
> > >   // async methods
> > >   Future<Exchange>      processAsync(Exchange exchange)
> > >   Future<Exchange>      processsync(Exchange exchange, AsyncHandler handler)
> > > }
> > >
> > > Then rather than adding a kinda done() method to the Exchange and
> > > calling it throughout every single producer/consumer/Processor
> > > implementation; we could just use the Future object to know when a
> > > particular asynchronous operation has completed. i.e. keep the async
> > > API to the side, for those rare cases folks really wanna use it -
> > > otherwise we can all stick to the simple sync API that works easily
> > > with transactions.
> > >
> >
> > This might be a good option.  I think that we don't need the
> > "Future<Exchange>      processAsync(Exchange exchange)" call since to
> > make an exchange async you just need to route it through a seda:
> > component.
> >
> > so perhaps we just add:
> > Future<Exchange>      processs(Exchange exchange, AsyncHandler handler)
> >
> > If the path of the exchange is sync, the it's a blocking call and by
> > the time it returns the Future will be done.  But it reached an async
> > component like seda: then it will return without the Future being
> > completed.
> >
> > > Thoughts?
> >
> > Sounds like like a good approach...  Perhaps I'll prototype it..
>
> Go for it! :)
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: Asynchronous Exchange Processing

Posted by James Strachan <ja...@gmail.com>.
On 8/23/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> On 8/23/07, James Strachan <ja...@gmail.com> wrote:
> > On 8/22/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > > Hi,
> > >
> > > Most of our components currently depend on synchronous processing of
> > > the Exchange or bad things can happen.  For example the following does
> > > not work:
> > >
> > > from("file:/tmp/foo").to("seda:test");
> > > from("seda:test").process( myProcessor );
> > >
> > > Why? because the file component delete the file as soon as the
> > > exchange returns from being sent to seda:test.  What would have been
> > > nice is that file deletion did not occur until after the exchange is
> > > processed by myProcessor.  But that's occuring in an asynchronous
> > > thread.
> > >
> > > Here's an idea that might help solve this problem.
> > > Have the seda component call something like
> > >    exchange.getExchangeFuture().done()
> > > when the message is processed in it's async thread.
> > >
> > > and in the file component, have it call
> > >    exchange.getExchangeFuture().get();
> > >    // then the code that deletes the file
> > > or
> > >    exchange.getExchangeFuture().setCallback( new Callback() {
> > >      public void done( Exchange exch ) {
> > >         // then the code that deletes the file
> > >      }
> > > })
> >
> > I was pondering about this with relation to this thread the other day...
> > http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
> >
> > I definitely think we need a standard way to register
> > post-commit/rollback hooks. i.e. on completion of processing (either
> > on a commit/completed or rollback/failed) allow a
> > processor/consumer/producer to register some logic such as to delete a
> > file, flush some cache etc. Note this is mostly required for
> > non-transactional things. e.g. in JPA and JMS we can just use
> > transactions for this.
>
> Actually transaction things are easy since they require all processing
> in the transaction to be done synchronously.

You could suspend the transaction and resume it in another thread; its
rarely done but it is a possible approach.


> The hard bit is
> processing the exchanges async.

Yeah - async is hard full stop I think :)


> > I'm kinda wondering; should we just try make things like files, FTP
> > and the like transactional; that is to say, we implement transaction
> > hooks so that we can do a file 'delete/rename' which is registered as
> > a transaction commit status listener? Just registering some kind of
> > onCommit/onRollback callbacks would do the trick though as you
> > suggest.
> >
>
> I don't like the idea of making this looks like transaction semantics
> when it's not.  Traditional transaction semantics force you to do
> processing synchronously.  And the point of this is exactly the
> opposite.

I could counter that by saying I don't like 2 different mechanisms to
describe a 'unit of work' with callback mechanisms for knowing when it
completes successfully or fails. i.e. having transaction callbacks and
async callbacks; we should have just one really.

e.g. using synchronous processing, I might want to process a file and
do a JDBC insert; only if the transaction commits do I want to delete
the file.

So I think being able to have file operations work nicely with
transactions (whether in sync or async mode) is a pretty common
requirement.

The issue though is; should we treat async processing as
suspending/resuming a transaction or not (as async processing of
transactions is tricky).


> > There's a second issue which is asynchronous processing; such as a
> > producer invoking an asynchronous processor then wanting some kind of
> > callback that the processing has completed. I wanted to make the easy
> > things really easy with Camel; so was a bit reluctant to add
> > asynchronous processing explicitly from the start for fear of making
> > the API very complex; most components afterall tend to be synchronous
> > (which makes transactions very easy to do too btw).
> >
>
> I agree with this..  and this is my greatest fear.  We need to make
> sure that the synchronous components stay as simple as they are today.
>  But allow async aware components support having their exchanges be
> processed async.

Yeah. I think there's gonna be few of 'em that are truly async too btw
- so only a few component ninja's will have to worry about that.


> > I was thinking we could add some optional API for AsyncProcessor which
> > is-a Processor but adds an asynchronous invocation API style; rather
> > like the Channel does in the ServiceMix 4 API...
> >
> > // sync API
> > interface Processor {
> >         void    process(Exchange exchange);
> > }
> >
> > interface AsyncProcessor extends Processor {
> >   // async methods
> >   Future<Exchange>      processAsync(Exchange exchange)
> >   Future<Exchange>      processsync(Exchange exchange, AsyncHandler handler)
> > }
> >
> > Then rather than adding a kinda done() method to the Exchange and
> > calling it throughout every single producer/consumer/Processor
> > implementation; we could just use the Future object to know when a
> > particular asynchronous operation has completed. i.e. keep the async
> > API to the side, for those rare cases folks really wanna use it -
> > otherwise we can all stick to the simple sync API that works easily
> > with transactions.
> >
>
> This might be a good option.  I think that we don't need the
> "Future<Exchange>      processAsync(Exchange exchange)" call since to
> make an exchange async you just need to route it through a seda:
> component.
>
> so perhaps we just add:
> Future<Exchange>      processs(Exchange exchange, AsyncHandler handler)
>
> If the path of the exchange is sync, the it's a blocking call and by
> the time it returns the Future will be done.  But it reached an async
> component like seda: then it will return without the Future being
> completed.
>
> > Thoughts?
>
> Sounds like like a good approach...  Perhaps I'll prototype it..

Go for it! :)

-- 
James
-------
http://macstrac.blogspot.com/

Re: Asynchronous Exchange Processing

Posted by Hiram Chirino <hi...@hiramchirino.com>.
On 8/23/07, James Strachan <ja...@gmail.com> wrote:
> On 8/22/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > Hi,
> >
> > Most of our components currently depend on synchronous processing of
> > the Exchange or bad things can happen.  For example the following does
> > not work:
> >
> > from("file:/tmp/foo").to("seda:test");
> > from("seda:test").process( myProcessor );
> >
> > Why? because the file component delete the file as soon as the
> > exchange returns from being sent to seda:test.  What would have been
> > nice is that file deletion did not occur until after the exchange is
> > processed by myProcessor.  But that's occuring in an asynchronous
> > thread.
> >
> > Here's an idea that might help solve this problem.
> > Have the seda component call something like
> >    exchange.getExchangeFuture().done()
> > when the message is processed in it's async thread.
> >
> > and in the file component, have it call
> >    exchange.getExchangeFuture().get();
> >    // then the code that deletes the file
> > or
> >    exchange.getExchangeFuture().setCallback( new Callback() {
> >      public void done( Exchange exch ) {
> >         // then the code that deletes the file
> >      }
> > })
>
> I was pondering about this with relation to this thread the other day...
> http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
>
> I definitely think we need a standard way to register
> post-commit/rollback hooks. i.e. on completion of processing (either
> on a commit/completed or rollback/failed) allow a
> processor/consumer/producer to register some logic such as to delete a
> file, flush some cache etc. Note this is mostly required for
> non-transactional things. e.g. in JPA and JMS we can just use
> transactions for this.

Actually transaction things are easy since they require all processing
in the transaction to be done synchronously.  The hard bit is
processing the exchanges async.

>
> I'm kinda wondering; should we just try make things like files, FTP
> and the like transactional; that is to say, we implement transaction
> hooks so that we can do a file 'delete/rename' which is registered as
> a transaction commit status listener? Just registering some kind of
> onCommit/onRollback callbacks would do the trick though as you
> suggest.
>

I don't like the idea of making this looks like transaction semantics
when it's not.  Traditional transaction semantics force you to do
processing synchronously.  And the point of this is exactly the
opposite.

> There's a second issue which is asynchronous processing; such as a
> producer invoking an asynchronous processor then wanting some kind of
> callback that the processing has completed. I wanted to make the easy
> things really easy with Camel; so was a bit reluctant to add
> asynchronous processing explicitly from the start for fear of making
> the API very complex; most components afterall tend to be synchronous
> (which makes transactions very easy to do too btw).
>

I agree with this..  and this is my greatest fear.  We need to make
sure that the synchronous components stay as simple as they are today.
 But allow async aware components support having their exchanges be
processed async.

> I was thinking we could add some optional API for AsyncProcessor which
> is-a Processor but adds an asynchronous invocation API style; rather
> like the Channel does in the ServiceMix 4 API...
>
> // sync API
> interface Processor {
>         void    process(Exchange exchange);
> }
>
> interface AsyncProcessor extends Processor {
>   // async methods
>   Future<Exchange>      processAsync(Exchange exchange)
>   Future<Exchange>      processsync(Exchange exchange, AsyncHandler handler)
> }
>
> Then rather than adding a kinda done() method to the Exchange and
> calling it throughout every single producer/consumer/Processor
> implementation; we could just use the Future object to know when a
> particular asynchronous operation has completed. i.e. keep the async
> API to the side, for those rare cases folks really wanna use it -
> otherwise we can all stick to the simple sync API that works easily
> with transactions.
>

This might be a good option.  I think that we don't need the
"Future<Exchange>      processAsync(Exchange exchange)" call since to
make an exchange async you just need to route it through a seda:
component.

so perhaps we just add:
Future<Exchange>      processs(Exchange exchange, AsyncHandler handler)

If the path of the exchange is sync, the it's a blocking call and by
the time it returns the Future will be done.  But it reached an async
component like seda: then it will return without the Future being
completed.

> Thoughts?

Sounds like like a good approach...  Perhaps I'll prototype it..

> --
> James
> -------
> http://macstrac.blogspot.com/
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: Asynchronous Exchange Processing

Posted by James Strachan <ja...@gmail.com>.
On 8/22/07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> Hi,
>
> Most of our components currently depend on synchronous processing of
> the Exchange or bad things can happen.  For example the following does
> not work:
>
> from("file:/tmp/foo").to("seda:test");
> from("seda:test").process( myProcessor );
>
> Why? because the file component delete the file as soon as the
> exchange returns from being sent to seda:test.  What would have been
> nice is that file deletion did not occur until after the exchange is
> processed by myProcessor.  But that's occuring in an asynchronous
> thread.
>
> Here's an idea that might help solve this problem.
> Have the seda component call something like
>    exchange.getExchangeFuture().done()
> when the message is processed in it's async thread.
>
> and in the file component, have it call
>    exchange.getExchangeFuture().get();
>    // then the code that deletes the file
> or
>    exchange.getExchangeFuture().setCallback( new Callback() {
>      public void done( Exchange exch ) {
>         // then the code that deletes the file
>      }
> })

I was pondering about this with relation to this thread the other day...
http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html

I definitely think we need a standard way to register
post-commit/rollback hooks. i.e. on completion of processing (either
on a commit/completed or rollback/failed) allow a
processor/consumer/producer to register some logic such as to delete a
file, flush some cache etc. Note this is mostly required for
non-transactional things. e.g. in JPA and JMS we can just use
transactions for this.

I'm kinda wondering; should we just try make things like files, FTP
and the like transactional; that is to say, we implement transaction
hooks so that we can do a file 'delete/rename' which is registered as
a transaction commit status listener? Just registering some kind of
onCommit/onRollback callbacks would do the trick though as you
suggest.

There's a second issue which is asynchronous processing; such as a
producer invoking an asynchronous processor then wanting some kind of
callback that the processing has completed. I wanted to make the easy
things really easy with Camel; so was a bit reluctant to add
asynchronous processing explicitly from the start for fear of making
the API very complex; most components afterall tend to be synchronous
(which makes transactions very easy to do too btw).

I was thinking we could add some optional API for AsyncProcessor which
is-a Processor but adds an asynchronous invocation API style; rather
like the Channel does in the ServiceMix 4 API...

// sync API
interface Processor {
	void 	process(Exchange exchange);
}

interface AsyncProcessor extends Processor {
  // async methods
  Future<Exchange> 	processAsync(Exchange exchange)
  Future<Exchange> 	processsync(Exchange exchange, AsyncHandler handler)
}

Then rather than adding a kinda done() method to the Exchange and
calling it throughout every single producer/consumer/Processor
implementation; we could just use the Future object to know when a
particular asynchronous operation has completed. i.e. keep the async
API to the side, for those rare cases folks really wanna use it -
otherwise we can all stick to the simple sync API that works easily
with transactions.

Thoughts?
-- 
James
-------
http://macstrac.blogspot.com/

Re: Asynchronous Exchange Processing

Posted by Rob Davies <ra...@gmail.com>.
On Aug 22, 2007, at 8:58 PM, Hiram Chirino wrote:

> Hi,
>
> Most of our components currently depend on synchronous processing of
> the Exchange or bad things can happen.  For example the following does
> not work:
>
> from("file:/tmp/foo").to("seda:test");
> from("seda:test").process( myProcessor );
>
> Why? because the file component delete the file as soon as the
> exchange returns from being sent to seda:test.  What would have been
> nice is that file deletion did not occur until after the exchange is
> processed by myProcessor.  But that's occuring in an asynchronous
> thread.
That's why I didn't put file deletion in originally :( - I'd assumed  
the consumer of the file exchange would have to clean it up (as  
there's no other way to know when the file can be deleted).
>
> Here's an idea that might help solve this problem.
> Have the seda component call something like
>    exchange.getExchangeFuture().done()
> when the message is processed in it's async thread.
>
> and in the file component, have it call
>    exchange.getExchangeFuture().get();
>    // then the code that deletes the file
> or
>    exchange.getExchangeFuture().setCallback( new Callback() {
>      public void done( Exchange exch ) {
>         // then the code that deletes the file
>      }
> })
>
> It's just a simple stab at a possible solution.  I got a feeling that
> it's going to get more complicated since now we are forcing components
> to be aware of the async processing model and we tend to copy
> exchanges and do processing in pipelines etc. etc.  But I'm hoping to
> get the conversation started on this topic.  What do you guys think??
> is there a simpler way to solve this?
I like the callback approach! +1 for that!
>
> Bellow you will find a simple patch that implements what the
> exchange.getExchangeFuture() method might look like.
>
> Index: camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java
> ===================================================================
> --- camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java	(revision
> 0)
> +++ camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java	(revision
> 0)
> @@ -0,0 +1,158 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file  
> distributed with
> + * this work for additional information regarding copyright  
> ownership.
> + * The ASF licenses this file to You under the Apache License,  
> Version 2.0
> + * (the "License"); you may not use this file except in compliance  
> with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,  
> software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or  
> implied.
> + * See the License for the specific language governing permissions  
> and
> + * limitations under the License.
> + */
> +package org.apache.camel.impl;
> +
> +import java.util.concurrent.CancellationException;
> +import java.util.concurrent.CountDownLatch;
> +import java.util.concurrent.ExecutionException;
> +import java.util.concurrent.Future;
> +import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.TimeoutException;
> +
> +import org.apache.camel.Exchange;
> +
> +/**
> + * The DefaultExchangeFuture allows Exchanges to processed
> asynchronously by allowing
> + * the producer and consumer of the exchange to signal each other  
> so that it
> + * is known when processing has completed.
> + *
> + * For processors participating on asynchronous exchanges, once an
> exchange has been
> + * processed, the DefaultExchangeFuture.done() method should be
> called to let the exchange
> + * creator know that the exchange has been processed.
> + *
> + * For exchange creators (these are typically the Component
> Consumers), if the exchange is being
> + * processes async, it should either wait for the exchange to get
> completed asynchronously using
> + * one of the get() methods or it should register a Callback using
> the setCallback() method.  Once
> + * the exchange is done, then he can destroy the originating event.
> + *
> + */
> +public class DefaultExchangeFuture implements Future<Exchange> {
> +
> +    private final Exchange exchange;
> +    private final CountDownLatch latch = new CountDownLatch(1);
> +    private boolean done;
> +    private boolean canceled;
> +    private Callback callback;
> +
> +    public static interface Callback {
> +        void done(Exchange exchange);
> +    }
> +
> +
> +    /**
> +     * @param exchange
> +     */
> +    public DefaultExchangeFuture(Exchange exchange) {
> +        this.exchange = exchange;
> +    }
> +
> +    public boolean cancel(boolean mayInterruptIfRunning) {
> +        boolean rc = false;
> +        Callback c = null;
> +
> +        synchronized (this) {
> +            if (!done && !canceled) {
> +                c = callback;
> +                canceled = true;
> +                latch.countDown();
> +                rc = true;
> +            }
> +        }
> +
> +        if (rc) {
> +            latch.countDown();
> +            if (c != null) {
> +                c.done(exchange);
> +            }
> +        }
> +        return rc;
> +    }
> +
> +    public Exchange get() throws InterruptedException,  
> ExecutionException {
> +        latch.await();
> +        synchronized (this) {
> +            if (canceled) {
> +                throw new CancellationException();
> +            }
> +            // TODO: We might want to do this..
> +            // if (exception != null) {
> +            // throw new ExecutionException(exception);
> +            // }
> +            return exchange;
> +        }
> +    }
> +
> +    public Exchange get(long timeout, TimeUnit unit) throws
> InterruptedException, ExecutionException, TimeoutException {
> +        latch.await(timeout, unit);
> +        synchronized (this) {
> +            if (canceled) {
> +                throw new CancellationException();
> +            }
> +            // TODO: We might want to do this..
> +            // if (exception != null) {
> +            // throw new ExecutionException(exception);
> +            // }
> +            return exchange;
> +        }
> +    }
> +
> +    public synchronized boolean isCancelled() {
> +        return canceled;
> +    }
> +
> +    public synchronized boolean isDone() {
> +        return done || canceled;
> +    }
> +
> +    public synchronized Callback getCallback() {
> +        return callback;
> +    }
> +
> +    /**
> +     * Registers a callback handler with the future if the future  
> is not yet
> +     * completed.
> +     *
> +     * @param callback
> +     * @return false if the callback could not get registered due  
> to the future
> +     *         being done.
> +     */
> +    public synchronized boolean setCallback(Callback callback) {
> +        if (isDone()) {
> +            return false;
> +        }
> +        this.callback = callback;
> +        return true;
> +    }
> +
> +    public void done() throws CancellationException,  
> IllegalStateException {
> +        Callback c = null;
> +        synchronized (this) {
> +            if (canceled) {
> +                throw new CancellationException();
> +            }
> +            if (done) {
> +                throw new IllegalStateException("Exchange is  
> allready done");
> +            }
> +            done = true;
> +            c = callback;
> +        }
> +        latch.countDown();
> +        if (c != null) {
> +            c.done(exchange);
> +        }
> +    }
> +}
>
> Property changes on:
> camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchangeFuture.java
> ___________________________________________________________________
> Name: svn:keywords
>    + Rev Date
> Name: svn:eol-style
>    + native
>
> Index: camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchange.java
> ===================================================================
> --- camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchange.java	(revision
> 568699)
> +++ camel-core/src/main/java/org/apache/camel/impl/ 
> DefaultExchange.java	(working
> copy)
> @@ -38,7 +38,8 @@
>      private Message fault;
>      private Throwable exception;
>      private String exchangeId =
> DefaultExchange.DEFAULT_ID_GENERATOR.generateId();
> -
> +    private final DefaultExchangeFuture exchangeFuture = new
> DefaultExchangeFuture(this);
> +
>      public DefaultExchange(CamelContext context) {
>          this.context = context;
>      }
> @@ -222,4 +223,9 @@
>              messageSupport.setExchange(this);
>          }
>      }
> +
> +
> +    DefaultExchangeFuture getExchangeFuture() {
> +        return exchangeFuture;
> +    }
>  }
>
>
> -- 
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com