You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/10/13 20:03:30 UTC

svn commit: r463758 - in /incubator/servicemix/trunk/servicemix-bean: ./ src/main/java/org/apache/servicemix/bean/ src/main/java/org/apache/servicemix/bean/support/ src/test/java/org/apache/servicemix/bean/ src/test/java/org/apache/servicemix/bean/beans/

Author: gnodet
Date: Fri Oct 13 11:03:28 2006
New Revision: 463758

URL: http://svn.apache.org/viewvc?view=rev&rev=463758
Log:
First draft of support for async InOut as consumers and providers.
Still needs lots of work :(

Added:
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Callback.java
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Destination.java
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/ExchangeTarget.java
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/ReflectionUtils.java
    incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
    incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerBean.java
Modified:
    incubator/servicemix/trunk/servicemix-bean/   (props changed)
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
    incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java

Propchange: incubator/servicemix/trunk/servicemix-bean/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Oct 13 11:03:28 2006
@@ -0,0 +1,4 @@
+target
+.classpath
+.project
+.settings

Modified: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?view=diff&rev=463758&r1=463757&r2=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java Fri Oct 13 11:03:28 2006
@@ -16,21 +16,46 @@
  */
 package org.apache.servicemix.bean;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+
 import org.aopalliance.intercept.MethodInvocation;
+import org.apache.commons.jexl.Expression;
+import org.apache.commons.jexl.ExpressionFactory;
+import org.apache.commons.jexl.JexlContext;
+import org.apache.commons.jexl.JexlHelper;
 import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.common.ProviderEndpoint;
-import org.apache.servicemix.common.ExchangeProcessor;
 import org.apache.servicemix.bean.support.BeanInfo;
 import org.apache.servicemix.bean.support.DefaultMethodInvocationStrategy;
 import org.apache.servicemix.bean.support.MethodInvocationStrategy;
+import org.apache.servicemix.bean.support.ReflectionUtils;
+import org.apache.servicemix.common.EndpointComponentContext;
+import org.apache.servicemix.common.ExchangeProcessor;
+import org.apache.servicemix.common.ProviderEndpoint;
+import org.apache.servicemix.jbi.FaultException;
+import org.apache.servicemix.jbi.resolver.URIResolver;
+import org.apache.servicemix.jbi.util.MessageUtil;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.BeanFactory;
 import org.springframework.beans.factory.BeanFactoryAware;
 
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.servicedesc.ServiceEndpoint;
-
 /**
  * Represents a bean endpoint which consists of a together with a {@link MethodInvocationStrategy}
  * so that JBI message exchanges can be invoked on the bean.
@@ -44,6 +69,12 @@
     private Object bean;
     private BeanInfo beanInfo;
     private MethodInvocationStrategy methodInvocationStrategy;
+    
+    private Map<String, Holder> exchanges = new ConcurrentHashMap<String, Holder>();
+    private Map<String, Request> requests = new ConcurrentHashMap<String, Request>();
+    private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
+    private ComponentContext context;
+    private DeliveryChannel channel;
 
     public BeanEndpoint() {
     }
@@ -55,6 +86,8 @@
 
     public void start() throws Exception {
         super.start();
+        context = new EndpointComponentContext(getServiceUnit().getComponent().getComponentContext());
+        channel = context.getDeliveryChannel();
 
         if (getBean() == null) {
             throw new IllegalArgumentException("No 'bean' property set");
@@ -64,9 +97,8 @@
         }
 
         injectBean(getBean());
-
-        // TODO invoke the bean's lifecycle methods for @PostConstruct
-        // could use Spring to do this?
+        // Call PostConstruct annotated methods
+        ReflectionUtils.callLifecycleMethod(getBean(), PostConstruct.class);
 
         if (getBean() instanceof ExchangeProcessor) {
             ExchangeProcessor processor = (ExchangeProcessor) getBean();
@@ -142,39 +174,47 @@
 
 
     public void process(MessageExchange exchange) throws Exception {
-        // TODO refactor this validation code back up into the base class?
-
-        // The component acts as a provider, this means that another component has requested our service
-        // As this exchange is active, this is either an in or a fault (out are sent by this component)
-        if (exchange.getRole() != MessageExchange.Role.PROVIDER) {
-            throw new IllegalStateException("Unsupported role: " + exchange.getRole());
+        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+            onConsumerExchange(exchange);
+        } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+            onProviderExchange(exchange);
+        } else {
+            throw new IllegalStateException("Unknown role: " + exchange.getRole());
         }
-
+    }
+    
+    protected void onProviderExchange(MessageExchange exchange) throws Exception {
         // Exchange is finished
         if (exchange.getStatus() == ExchangeStatus.DONE) {
             return;
-            // Exchange has been aborted with an exception
         }
+        // Exchange has been aborted with an exception
         else if (exchange.getStatus() == ExchangeStatus.ERROR) {
             return;
-        }
-
         // Fault message
-        if (exchange.getFault() != null) {
+        } else if (exchange.getFault() != null) {
+            // TODO: find a way to send it back to the bean before setting the DONE status
             done(exchange);
-            return;
-        }
-        try {
+        } else {
             onMessageExchange(exchange);
             done(exchange);
         }
-        catch (Exception e) {
-            fail(exchange, e);
+    }
+    
+    protected void onConsumerExchange(MessageExchange exchange) throws Exception {
+        Holder me = exchanges.get(exchange.getExchangeId());
+        if (me == null) {
+            throw new IllegalStateException("Consumer exchange not found");
         }
+        me.set(exchange);
+        evaluateCallbacks(requests.remove(exchange.getExchangeId()));
     }
 
     protected void onMessageExchange(MessageExchange exchange) throws Exception {
-        Object pojo = getBean();
+        Request req = new Request(getBean(), exchange);
+        requests.put(exchange.getExchangeId(), req);
+        currentRequest.set(req);
+        Object pojo = req.getBean();
         if (pojo instanceof MessageExchangeListener) {
             MessageExchangeListener listener = (MessageExchangeListener) pojo;
             listener.onMessageExchange(exchange);
@@ -190,11 +230,9 @@
             }
             try {
                 invocation.proceed();
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 throw e;
-            }
-            catch (Throwable throwable) {
+            } catch (Throwable throwable) {
                 throw new MethodInvocationFailedException(pojo, invocation, exchange, this, throwable);
             }
         }
@@ -220,6 +258,164 @@
      *
      * @param bean the bean to be injected
      */
-    protected void injectBean(Object bean) {
+    protected void injectBean(final Object bean) {
+        // Inject fields
+        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
+            public void doWith(Field f) throws IllegalArgumentException, IllegalAccessException {
+                ExchangeTarget et = f.getAnnotation(ExchangeTarget.class);
+                if (et != null) {
+                    ReflectionUtils.setField(f, bean, new DestinationImpl(et.uri()));
+                }
+                if (f.getAnnotation(Resource.class) != null) {
+                    if (ComponentContext.class.isAssignableFrom(f.getType())) {
+                        ReflectionUtils.setField(f, bean, context);
+                    } else if (DeliveryChannel.class.isAssignableFrom(f.getType())) {
+                        ReflectionUtils.setField(f, bean, channel);
+                    }
+                }
+            }
+        });
+    }
+    
+    protected void evaluateCallbacks(Request req) {
+        final Object bean = req.getBean();
+        ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
+            @SuppressWarnings("unchecked")
+            public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
+                if (method.getAnnotation(Callback.class) != null) {
+                    try {
+                        Expression e = ExpressionFactory.createExpression(method.getAnnotation(Callback.class).condition());
+                        JexlContext jc = JexlHelper.createContext();
+                        jc.getVars().put("this", bean);
+                        Object r = e.evaluate(jc);
+                        if (r instanceof Boolean && ((Boolean) r).booleanValue()) {
+                            Object o = method.invoke(bean, new Object[0]);
+                            // TODO
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException("Unable to invoke callback", e);
+                    }
+                }
+            }
+        });
+    }
+    
+    public class DestinationImpl implements Destination {
+
+        private final String uri;
+        
+        public DestinationImpl(String uri) {
+            this.uri = uri;
+        }
+        
+        public NormalizedMessage createMessage() {
+            return new MessageUtil.NormalizedMessageImpl();
+        }
+
+        public Future<NormalizedMessage> send(NormalizedMessage message) {
+            try {
+                InOut me = getExchangeFactory().createInOutExchange();
+                URIResolver.configureExchange(me, getServiceUnit().getComponent().getComponentContext(), uri);
+                MessageUtil.transferTo(message, me, "in");
+                return send(me);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+        protected Future<NormalizedMessage> send(final MessageExchange me) throws Exception {
+            final Holder h = new Holder(); 
+            requests.put(me.getExchangeId(), currentRequest.get());
+            exchanges.put(me.getExchangeId(), h);
+            BeanEndpoint.this.send(me);
+            return h;
+        }
+    }
+    
+    public static class Request {
+        private Object bean;
+        private MessageExchange exchange;
+        private Set<String> sentExchanges = new HashSet<String>();
+        
+        public Request() {
+        }
+        
+        public Request(Object bean, MessageExchange exchange) {
+            this.bean = bean;
+            this.exchange = exchange;
+        }
+        
+        /**
+         * @return the bean
+         */
+        public Object getBean() {
+            return bean;
+        }
+        /**
+         * @param bean the bean to set
+         */
+        public void setBean(Object bean) {
+            this.bean = bean;
+        }
+        /**
+         * @return the exchange
+         */
+        public MessageExchange getExchange() {
+            return exchange;
+        }
+        /**
+         * @param exchange the exchange to set
+         */
+        public void setExchange(MessageExchange exchange) {
+            this.exchange = exchange;
+        }
+        /**
+         * @param id the id of the exchange sent 
+         */
+        public void addSentExchange(String id) {
+            sentExchanges.add(id);
+        }
+    }
+    
+    public static class Holder implements Future<NormalizedMessage> {
+        
+        private MessageExchange object;
+        private boolean cancel;
+        
+        public synchronized NormalizedMessage get() throws InterruptedException, ExecutionException {
+            if (object == null) {
+                wait();
+            }
+            return extract(object);
+        }
+        public synchronized NormalizedMessage get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
+            if (object == null) {
+                wait(unit.toMillis(timeout));
+            }
+            return extract(object);
+        }
+        public synchronized void set(MessageExchange t) {
+            object = t;
+            notifyAll();
+        }
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            cancel = true;
+            return false;
+        }
+        public boolean isCancelled() {
+            return cancel;
+        }
+        public boolean isDone() {
+            return object != null;
+        }
+        protected NormalizedMessage extract(MessageExchange me) throws ExecutionException {
+            if (me.getStatus() == ExchangeStatus.ERROR) {
+                throw new ExecutionException(me.getError());
+            } else if (me.getFault() != null) {
+                throw new ExecutionException(new FaultException("Fault occured", me, me.getFault()));
+            } else {
+                return me.getMessage("out");
+            }
+        }
     }
 }

Added: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Callback.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Callback.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Callback.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Callback.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target( { METHOD })
+public @interface Callback {
+
+    String condition();
+}

Added: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Destination.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Destination.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Destination.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Destination.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import java.util.concurrent.Future;
+
+import javax.jbi.messaging.NormalizedMessage;
+
+public interface Destination {
+
+    NormalizedMessage createMessage();
+    
+    Future<NormalizedMessage> send(NormalizedMessage message);
+    
+}

Added: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/ExchangeTarget.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/ExchangeTarget.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/ExchangeTarget.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/ExchangeTarget.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target( { FIELD })
+public @interface ExchangeTarget {
+
+    String uri();
+    
+}

Added: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/Operation.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+@Retention(RUNTIME)
+@Target( { METHOD })
+public @interface Operation {
+    
+    String name() default "";
+
+}

Modified: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java?view=diff&rev=463758&r1=463757&r2=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java (original)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/BeanInfo.java Fri Oct 13 11:03:28 2006
@@ -58,7 +58,7 @@
         this.type = type;
         this.strategy = strategy;
         introspect(type);
-        if (operations.size() == 0) {
+        if (operations.size() == 1) {
             Collection<MethodInfo> methodInfos = operations.values();
             for (MethodInfo methodInfo : methodInfos) {
                 defaultExpression = methodInfo;

Added: incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/ReflectionUtils.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/ReflectionUtils.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/main/java/org/apache/servicemix/bean/support/ReflectionUtils.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.support;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+public class ReflectionUtils extends org.springframework.util.ReflectionUtils {
+
+    public static <T extends Annotation> void callLifecycleMethod(final Object bean, final Class<T> annotation) {
+        ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
+            public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
+                if (method.getAnnotation(annotation) != null) {
+                    try {
+                        method.invoke(bean, (Object[]) null);
+                    } catch (IllegalArgumentException ex) {
+                        throw new IllegalStateException("Failure to invoke " + method + " on " + bean.getClass() + ": args=[]", ex);
+                    } catch (IllegalAccessException ex) {
+                        throw new UnsupportedOperationException(ex.toString());
+                    } catch (InvocationTargetException ex) {
+                        throw new UnsupportedOperationException("PostConstruct method on bean threw exception", ex.getTargetException());
+                    }
+                }
+            }
+        });
+    }
+    
+    public static void setField(Field f, Object instance, Object value) {
+        try {
+            boolean oldAccessible = f.isAccessible();
+            boolean shouldSetAccessible = (!Modifier.isPublic(f.getModifiers()) && !oldAccessible);
+            if (shouldSetAccessible) {
+                f.setAccessible(true);
+            }
+            f.set(instance, value);
+            if (shouldSetAccessible) {
+                f.setAccessible(oldAccessible);
+            }
+        } catch (IllegalArgumentException ex) {
+            throw new UnsupportedOperationException("Cannot inject value of class '" + value.getClass() + "' into " + f);
+        } catch (IllegalAccessException ex) {
+            ReflectionUtils.handleReflectionException(ex);
+        }
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.bean.beans.ConsumerBean;
+import org.apache.servicemix.client.DefaultServiceMixClient;
+import org.apache.servicemix.client.ServiceMixClient;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+
+public class ConsumerBeanTest extends TestCase {
+
+    protected JBIContainer jbi;
+    
+    protected void setUp() throws Exception {
+        jbi = new JBIContainer();
+        jbi.setEmbedded(true);
+        jbi.init();
+    }
+    
+    public void test() throws Exception {
+        BeanComponent bc = new BeanComponent();
+        BeanEndpoint ep = new BeanEndpoint();
+        ep.setService(new QName("bean"));
+        ep.setEndpoint("endpoint");
+        ep.setBean(new ConsumerBean());
+        bc.setEndpoints(new BeanEndpoint[] { ep });
+        jbi.activateComponent(bc, "servicemix-bean");
+        
+        EchoComponent echo1 = new EchoComponent(new QName("urn", "service1"), "endpoint");
+        jbi.activateComponent(echo1, "echo1");
+        
+        EchoComponent echo2 = new EchoComponent(new QName("urn", "service2"), "endpoint");
+        jbi.activateComponent(echo2, "echo2");
+
+        jbi.start();
+
+        ServiceMixClient client = new DefaultServiceMixClient(jbi);
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("bean"));
+        me.setOperation(new QName("receive"));
+        NormalizedMessage nm = me.getInMessage();
+        nm.setContent(new StringSource("<hello>world</hello>"));
+        client.sendSync(me);
+        assertExchangeWorked(me);
+        client.done(me);
+    }
+    
+    protected void assertExchangeWorked(MessageExchange me) throws Exception {
+        if (me.getStatus() == ExchangeStatus.ERROR) {
+            if (me.getError() != null) {
+                throw me.getError();
+            }
+            else {
+                fail("Received ERROR status");
+            }
+        }
+        else if (me.getFault() != null) {
+            fail("Received fault: " + new SourceTransformer().toString(me.getFault().getContent()));
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerBean.java?view=auto&rev=463758
==============================================================================
--- incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerBean.java (added)
+++ incubator/servicemix/trunk/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerBean.java Fri Oct 13 11:03:28 2006
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.beans;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.bean.Callback;
+import org.apache.servicemix.bean.Destination;
+import org.apache.servicemix.bean.ExchangeTarget;
+import org.apache.servicemix.bean.Operation;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public class ConsumerBean {
+
+    private Future<NormalizedMessage> request1;
+    private Future<NormalizedMessage> request2;
+
+    @Resource
+    private ComponentContext context;
+    
+    @Resource
+    private DeliveryChannel channel;
+    
+    @ExchangeTarget(uri="service:urn:service1")
+    private Destination service1;
+
+    @ExchangeTarget(uri="service:urn:service2")
+    private Destination service2;  
+    
+    /**
+     * @return the request1
+     */
+    public Future<NormalizedMessage> getRequest1() {
+        return request1;
+    }
+
+    /**
+     * @return the request2
+     */
+    public Future<NormalizedMessage> getRequest2() {
+        return request2;
+    }
+
+    @PostConstruct
+    public void init() {
+        if (service1 == null || service2 == null || context == null || channel == null) {
+            throw new IllegalStateException("Bean not initialized");
+        }
+    }
+    
+    @PreDestroy
+    public void destroy() {
+    }
+    
+    @Operation(name="receive")
+    public void receive(NormalizedMessage message) throws Exception {
+        request1 = service1.send(MessageUtil.copy(message));
+        request2 = service2.send(MessageUtil.copy(message));
+    }
+    
+    @Callback(condition="this.request1.done && this.request2.done")
+    public NormalizedMessage answer() throws InterruptedException, ExecutionException {
+        NormalizedMessage answer1 = request1.get();
+        NormalizedMessage answer2 = request2.get();
+        return null;
+    }
+    
+}