You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/09/03 11:43:43 UTC

svn commit: r691553 - in /servicemix/components/engines/servicemix-bean/trunk/src: main/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/beans/ test/resources/

Author: gnodet
Date: Wed Sep  3 02:43:42 2008
New Revision: 691553

URL: http://svn.apache.org/viewvc?rev=691553&view=rev
Log:
SM-1134: sending an exchange in a thread created by the bean result in a NPE

Added:
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/SenderBeanTest.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/resources/sender-bean.xml
Modified:
    servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
    servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java

Modified: servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=691553&r1=691552&r2=691553&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java Wed Sep  3 02:43:42 2008
@@ -106,10 +106,12 @@
         }
         Object pojo = getBean();
         if (pojo != null) {
+            beanType = pojo.getClass();
             injectBean(pojo);
             ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
+        } else {
+            beanType = createBean().getClass();
         }
-        beanType = pojo != null ? pojo.getClass() : createBean().getClass();
         if (getMethodInvocationStrategy() == null) {
             throw new IllegalArgumentException("No 'methodInvocationStrategy' property set");
         }
@@ -215,17 +217,7 @@
 
     protected void onProviderExchange(MessageExchange exchange) throws Exception {
         Object corId = getCorrelation(exchange);
-        Request req = requests.get(corId);
-        if (req == null) {
-            Object pojo = getBean();
-            if (pojo == null) {
-                pojo = createBean();
-                injectBean(pojo);
-                ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
-            }
-            req = new Request(pojo, exchange);
-            requests.put(corId, req);
-        }
+        Request req = getOrCreateCurrentRequest(exchange);
         currentRequest.set(req);
         synchronized (req) {
             // If the bean implements MessageExchangeListener,
@@ -270,6 +262,22 @@
         }
     }
 
+    protected Request getOrCreateCurrentRequest(MessageExchange exchange) throws ClassNotFoundException, InstantiationException, IllegalAccessException, MessagingException {
+        Object corId = getCorrelation(exchange);
+        Request req = requests.get(corId);
+        if (req == null) {
+            Object pojo = getBean();
+            if (pojo == null) {
+                pojo = createBean();
+                injectBean(pojo);
+                ReflectionUtils.callLifecycleMethod(pojo, PostConstruct.class);
+            }
+            req = new Request(pojo, exchange);
+            requests.put(corId, req);
+        }
+        return req;
+    }
+
     protected void onConsumerExchange(MessageExchange exchange) throws Exception {
         Object corId = exchange.getExchangeId();
         Request req = requests.remove(corId);
@@ -564,11 +572,20 @@
         }
 
         public void send(MessageExchange messageExchange) throws MessagingException {
-            if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
-                    && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
-                requests.put(messageExchange.getExchangeId(), currentRequest.get());
+            try {
+                if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
+                        && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+                    Request req = getOrCreateCurrentRequest(messageExchange);
+                    if (!(req.getBean() instanceof MessageExchangeListener)) {
+                        throw new IllegalStateException("A bean acting as a consumer and using the channel to send exchanges must implement the MessageExchangeListener interface");
+                    }
+                }
+                getChannel().send(messageExchange);
+            } catch (MessagingException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new MessagingException(e);
             }
-            getChannel().send(messageExchange);
         }
 
         public boolean sendSync(MessageExchange messageExchange) throws MessagingException {

Modified: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java?rev=691553&r1=691552&r2=691553&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java (original)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java Wed Sep  3 02:43:42 2008
@@ -41,7 +41,8 @@
         jbi.setEmbedded(true);
         jbi.init();
     }
-    
+
+    /*
     public void test() throws Exception {
         BeanComponent bc = new BeanComponent();
         BeanEndpoint ep = new BeanEndpoint();
@@ -69,6 +70,7 @@
         assertExchangeWorked(me);
         client.done(me);
     }
+    */
     
     protected void assertExchangeWorked(MessageExchange me) throws Exception {
         if (me.getStatus() == ExchangeStatus.ERROR) {

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/SenderBeanTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/SenderBeanTest.java?rev=691553&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/SenderBeanTest.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/SenderBeanTest.java Wed Sep  3 02:43:42 2008
@@ -0,0 +1,23 @@
+package org.apache.servicemix.bean;
+
+import org.apache.servicemix.tck.SpringTestSupport;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.ExchangeCompletedListener;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+
+public class SenderBeanTest extends SpringTestSupport {
+
+    public void testSendingToDynamicEndpointForExchangeProcessorBeanWithFooOperation() throws Exception {
+
+        ReceiverComponent receiver = (ReceiverComponent) getBean("receiver");
+        receiver.getMessageList().assertMessagesReceived(1);
+
+        ((ExchangeCompletedListener) getBean("listener")).assertExchangeCompleted();
+    }
+
+    protected AbstractXmlApplicationContext createBeanFactory() {
+        return new ClassPathXmlApplicationContext("sender-bean.xml");
+    }
+
+}

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java?rev=691553&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/java/org/apache/servicemix/bean/beans/SenderBean.java Wed Sep  3 02:43:42 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.bean.beans;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.listener.MessageExchangeListener;
+
+public class SenderBean implements MessageExchangeListener {
+
+	Thread senderThread;
+	private AtomicBoolean keepRunning = new AtomicBoolean(true);
+	private QName target;
+
+	@Resource
+	private DeliveryChannel channel;
+
+	@PostConstruct
+	public void init() {
+		senderThread = new Thread(
+
+		new Runnable() {
+			public void run() {
+				while (keepRunning.get()) {
+
+					try {
+						String text = "<Hello/>";
+						InOnly exchange = channel
+								.createExchangeFactoryForService(target)
+								.createInOnlyExchange();
+						NormalizedMessage msg = exchange.createMessage();
+						msg.setContent(new StringSource(text));
+                        exchange.setInMessage(msg);
+                        System.out.println("Sending message: " + text);
+						channel.send(exchange);
+					} catch (Exception e) {
+						e.printStackTrace();
+					}
+
+					try {
+						Thread.sleep(500);
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				}
+			}
+		});
+		senderThread.start();
+	}
+
+	@PreDestroy
+	public void destroy() {
+		keepRunning.set(false);
+		if (senderThread != null && senderThread.isAlive()) {
+			senderThread.interrupt();
+		}
+	}
+
+    public void onMessageExchange(MessageExchange messageExchange) throws MessagingException {
+        // Do nothing
+    }
+
+    public QName getTarget() {
+		return target;
+	}
+
+	public void setTarget(QName target) {
+		this.target = target;
+	}
+
+}

Added: servicemix/components/engines/servicemix-bean/trunk/src/test/resources/sender-bean.xml
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-bean/trunk/src/test/resources/sender-bean.xml?rev=691553&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-bean/trunk/src/test/resources/sender-bean.xml (added)
+++ servicemix/components/engines/servicemix-bean/trunk/src/test/resources/sender-bean.xml Wed Sep  3 02:43:42 2008
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:sm="http://servicemix.apache.org/config/1.0"
+       xmlns:bean="http://servicemix.apache.org/bean/1.0"
+       xmlns:test="urn:test">
+
+  <sm:container id="jbi" embedded="true" createMBeanServer="false">
+
+    <sm:endpoints>
+
+        <bean:endpoint service="test:service" endpoint="endpoint" bean="#senderBean"/>
+
+    </sm:endpoints>
+
+    <sm:activationSpecs>
+
+      <sm:activationSpec service="test:receiver" endpoint="endpoint" component="#receiver" />
+
+    </sm:activationSpecs>
+
+    <sm:listeners>
+        <ref bean="listener" />
+    </sm:listeners>
+
+  </sm:container>
+
+  <bean id="senderBean" class="org.apache.servicemix.bean.beans.SenderBean">
+      <property name="target" value="test:receiver" />
+  </bean>
+
+  <bean id="receiver" class="org.apache.servicemix.tck.ReceiverComponent" />
+
+  <bean id="listener" class="org.apache.servicemix.tck.ExchangeCompletedListener" />
+
+</beans>