You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by an...@apache.org on 2009/09/14 17:14:34 UTC

svn commit: r814685 - in /tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host: ./ ASFListener.java DefaultJMSHostExtensionPointImpl.java JMSServiceListenerFactoryImpl.java ServiceInvoker.java

Author: antelder
Date: Mon Sep 14 15:14:34 2009
New Revision: 814685

URL: http://svn.apache.org/viewvc?rev=814685&view=rev
Log:
Add the jms host impl classes

Added:
    tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/
    tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ASFListener.java
    tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/DefaultJMSHostExtensionPointImpl.java
    tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/JMSServiceListenerFactoryImpl.java
    tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ServiceInvoker.java

Added: tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ASFListener.java
URL: http://svn.apache.org/viewvc/tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ASFListener.java?rev=814685&view=auto
==============================================================================
--- tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ASFListener.java (added)
+++ tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ASFListener.java Mon Sep 14 15:14:34 2009
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+
+package org.apache.tuscany.sca.binding.jms.host;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+
+import org.apache.tuscany.sca.binding.jms.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.JMSBindingConstants;
+import org.apache.tuscany.sca.binding.jms.JMSBindingException;
+import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory;
+import org.apache.tuscany.sca.host.jms.JMSServiceListener;
+import org.apache.tuscany.sca.work.WorkScheduler;
+
+/**
+ * Implementation of the JMS service binding provider.
+ * 
+ * @version $Rev: 721811 $ $Date: 2008-11-30 13:46:51 +0000 (Sun, 30 Nov 2008) $
+ */
+public class ASFListener implements JMSServiceListener {
+    private static final Logger logger = Logger.getLogger(ASFListener.class.getName());
+
+    private MessageListener listener;
+    private String serviceName;
+    private boolean isCallbackService;
+    private JMSBinding jmsBinding;
+    private WorkScheduler workScheduler;
+
+    private JMSResourceFactory jmsResourceFactory;
+    private MessageConsumer consumer;
+    private boolean running;
+
+    private Destination destination;
+
+    public ASFListener(MessageListener listener, String serviceName, boolean isCallbackService, JMSBinding jmsBinding, WorkScheduler workScheduler, JMSResourceFactory rf) {
+        this.listener = listener;
+        this.serviceName = serviceName;
+        this.isCallbackService = isCallbackService;
+        this.jmsBinding = jmsBinding;
+        this.workScheduler = workScheduler;
+        this.jmsResourceFactory = rf;
+    }
+    
+    public void start() {
+        this.running = true;
+
+        try {
+            registerListerner();
+        } catch (Exception e) {
+            throw new JMSBindingException("Error starting JMSServiceBinding", e);
+        }
+    }
+
+    public void stop() {
+        this.running = false;
+        try {
+            consumer.close();
+            jmsResourceFactory.closeConnection();
+            jmsResourceFactory.closeResponseConnection();
+        } catch (Exception e) {
+            // if using an embedded broker then when shutting down Tuscany the broker may get closed
+            // before this stop method is called. I can't see how to detect that so for now just
+            // ignore the exception if the message is that the transport is already disposed
+            if (!"Transport disposed.".equals(e.getMessage())) {
+                throw new JMSBindingException("Error stopping JMSServiceBinding", e);
+            }
+        }
+    }
+
+    private void registerListerner() throws NamingException, JMSException {
+
+        Session session = jmsResourceFactory.createSession();
+        destination = lookupDestinationQueue();
+        if (destination == null) {
+            destination = session.createTemporaryQueue();
+        }
+
+        if (jmsBinding.getJMSSelector() != null) {
+            consumer = session.createConsumer(destination, jmsBinding.getJMSSelector());
+        } else {
+            consumer = session.createConsumer(destination);
+        }
+
+        try {
+
+            consumer.setMessageListener(listener);
+            jmsResourceFactory.startConnection();
+
+        } catch (javax.jms.JMSException e) {
+
+            // setMessageListener not allowed in JEE container so use Tuscany threads
+
+            jmsResourceFactory.startConnection();
+            workScheduler.scheduleWork(new Runnable() {
+                public void run() {
+                    try {
+                        while (running) {
+                            final Message msg = consumer.receive();
+                            workScheduler.scheduleWork(new Runnable() {
+                                public void run() {
+                                    try {
+                                        listener.onMessage(msg);
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        logger.log(Level.INFO, "JMS " + (isCallbackService ? "callback service" : "service")
+            + " '"
+            + serviceName
+            + "' listening on destination "
+            + ((destination instanceof Queue) ? ((Queue)destination).getQueueName() : ((Topic)destination).getTopicName()));
+    }
+
+    /**
+     * Looks up the Destination Queue for the JMS Binding.
+     * <p>
+     * What happens in the look up will depend on the create mode specified for the JMS Binding:
+     * <ul>
+     * <li>always - the JMS queue is always created. It is an error if the queue already exists
+     * <li>ifnotexist - the JMS queue is created if it does not exist. It is not an error if the queue already exists
+     * <li>never - the JMS queue is never created. It is an error if the queue does not exist
+     * </ul>
+     * See the SCA JMS Binding specification for more information.
+     * <p>
+     * 
+     * @return The Destination queue.
+     * @throws NamingException Failed to lookup JMS queue
+     * @throws JMSBindingException Failed to lookup JMS Queue. Probable cause is that the JMS queue's current existence/non-existence is not
+     *                 compatible with the create mode specified on the binding
+     */
+    private Destination lookupDestinationQueue() throws NamingException, JMSBindingException {
+
+        if (isCallbackService && (jmsBinding.getDestinationName() == null)) {
+            // if its a callback service returning null indicates to use a temporary queue
+            return null;
+        }
+
+        Destination destination = jmsResourceFactory.lookupDestination(jmsBinding.getDestinationName());
+
+        String qCreateMode = jmsBinding.getDestinationCreate();
+        if (qCreateMode.equals(JMSBindingConstants.CREATE_ALWAYS)) {
+            // In this mode, the queue must not already exist as we are creating it
+            if (destination != null) {
+                throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName()
+                    + " already exists but has create mode of \""
+                    + qCreateMode
+                    + "\" while registering service "
+                    + serviceName
+                    + " listener");
+            }
+
+            // Create the queue
+            destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName());
+
+        } else if (qCreateMode.equals(JMSBindingConstants.CREATE_IF_NOT_EXIST)) {
+            // In this mode, the queue may nor may not exist. It will be created if it does not exist
+            if (destination == null) {
+                destination = jmsResourceFactory.createDestination(jmsBinding.getDestinationName());
+            }
+
+        } else if (qCreateMode.equals(JMSBindingConstants.CREATE_NEVER)) {
+            // In this mode, the queue must have already been created.
+            if (destination == null) {
+                throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName()
+                    + " not found but create mode of \""
+                    + qCreateMode
+                    + "\" while registering service "
+                    + serviceName
+                    + " listener");
+            }
+        }
+
+        // Make sure we ended up with a queue
+        if (destination == null) {
+            throw new JMSBindingException("JMS Destination " + jmsBinding.getDestinationName()
+                + " not found with create mode of \""
+                + qCreateMode
+                + "\" while registering service "
+                + serviceName
+                + " listener");
+        }
+
+        return destination;
+    }
+
+    public String getDestinationName() {
+        try {
+            if (destination instanceof Queue) {
+                return ((Queue)destination).getQueueName();
+            } else if (destination instanceof Topic) {
+                return ((Topic)destination).getTopicName();
+            } else {
+                return null;
+            }
+        } catch (JMSException e) {
+            throw new JMSBindingException(e);
+        }
+    }
+
+}

Added: tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/DefaultJMSHostExtensionPointImpl.java
URL: http://svn.apache.org/viewvc/tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/DefaultJMSHostExtensionPointImpl.java?rev=814685&view=auto
==============================================================================
--- tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/DefaultJMSHostExtensionPointImpl.java (added)
+++ tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/DefaultJMSHostExtensionPointImpl.java Mon Sep 14 15:14:34 2009
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+
+package org.apache.tuscany.sca.binding.jms.host;
+
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.host.jms.JMSHostExtensionPoint;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory;
+import org.apache.tuscany.sca.work.WorkScheduler;
+
+public class DefaultJMSHostExtensionPointImpl implements JMSHostExtensionPoint {
+
+    private JMSServiceListenerFactory jmsServiceListenerFactory;
+
+    public DefaultJMSHostExtensionPointImpl(ExtensionPointRegistry extensionPoints) {
+        UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class);
+        WorkScheduler workScheduler = utilities.getUtility(WorkScheduler.class);
+        this.jmsServiceListenerFactory = new JMSServiceListenerFactoryImpl(workScheduler);
+    }
+
+    public JMSServiceListenerFactory getJMSServiceListenerFactory() {
+        return jmsServiceListenerFactory;
+    }
+
+}

Added: tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/JMSServiceListenerFactoryImpl.java
URL: http://svn.apache.org/viewvc/tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/JMSServiceListenerFactoryImpl.java?rev=814685&view=auto
==============================================================================
--- tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/JMSServiceListenerFactoryImpl.java (added)
+++ tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/JMSServiceListenerFactoryImpl.java Mon Sep 14 15:14:34 2009
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+
+package org.apache.tuscany.sca.binding.jms.host;
+
+import javax.jms.MessageListener;
+import javax.naming.NamingException;
+
+import org.apache.tuscany.sca.binding.jms.JMSBindingException;
+import org.apache.tuscany.sca.binding.jms.provider.JMSBindingServiceBindingProvider;
+import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory;
+import org.apache.tuscany.sca.host.jms.JMSServiceListener;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerDetails;
+import org.apache.tuscany.sca.host.jms.JMSServiceListenerFactory;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.work.WorkScheduler;
+
+public class JMSServiceListenerFactoryImpl implements JMSServiceListenerFactory {
+
+    private WorkScheduler workScheduler;
+
+    public JMSServiceListenerFactoryImpl(WorkScheduler workScheduler) {
+        this.workScheduler = workScheduler;
+    }
+
+    public JMSServiceListener createJMSServiceListener(JMSServiceListenerDetails jmsSLD) {
+        try {
+
+            JMSResourceFactory rf = ((JMSBindingServiceBindingProvider)jmsSLD).getResourceFactory();
+            
+            MessageListener listener = new ServiceInvoker(jmsSLD.getJmsBinding(), jmsSLD.getService(), jmsSLD.getTargetBinding(), jmsSLD.getMessageFactory(), rf);
+            RuntimeComponentService service = jmsSLD.getService();
+
+//            return new ASFListener(listener, service.getName(), service.isCallback(), jmsSLD.getJmsBinding(), workScheduler, rf);
+// TODO: 2.x migration, service.isCallback()             
+            return new ASFListener(listener, service.getName(), false, jmsSLD.getJmsBinding(), workScheduler, rf);
+
+        } catch (NamingException e) {
+            throw new JMSBindingException(e);
+        }
+    }
+}

Added: tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ServiceInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ServiceInvoker.java?rev=814685&view=auto
==============================================================================
--- tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ServiceInvoker.java (added)
+++ tuscany/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/host/ServiceInvoker.java Mon Sep 14 15:14:34 2009
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.tuscany.sca.binding.jms.host;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.naming.NamingException;
+
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.binding.jms.JMSBinding;
+import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext;
+import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory;
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+
+/**
+ * TODO RRB experiement
+ * Listener for the JMSBinding.
+ * 
+ * @version $Rev: 721811 $ $Date: 2008-11-30 13:46:51 +0000 (Sun, 30 Nov 2008) $
+ */
+public class ServiceInvoker implements MessageListener {
+
+    private static final Logger logger = Logger.getLogger(ServiceInvoker.class.getName());
+
+    private JMSBinding jmsBinding;
+    private Binding targetBinding;
+    private JMSResourceFactory jmsResourceFactory;
+    private RuntimeComponentService service;
+    private MessageFactory messageFactory;
+
+    public ServiceInvoker(JMSBinding jmsBinding, RuntimeComponentService service, Binding targetBinding, MessageFactory messageFactory, JMSResourceFactory rf) throws NamingException {
+        this.jmsBinding = jmsBinding;
+        this.jmsResourceFactory = rf;
+        this.service = service;
+        this.targetBinding = targetBinding;
+        this.messageFactory = messageFactory;
+        
+    }
+
+    public void onMessage(Message requestJMSMsg) {
+        logger.log(Level.FINE, "JMS service '" + service.getName() + "' received message " + requestJMSMsg);
+        try {
+            invokeService(requestJMSMsg);
+        } catch (Throwable e) {
+            logger.log(Level.SEVERE, "Exception send fault response '" + service.getName(), e);
+        }
+    }
+
+    protected void invokeService(Message requestJMSMsg) throws JMSException, InvocationTargetException {
+
+        // create the tuscany message
+        org.apache.tuscany.sca.invocation.Message tuscanyMsg = messageFactory.createMessage();
+        
+        // populate the message context with JMS binding information
+        JMSBindingContext context = new JMSBindingContext();
+        tuscanyMsg.setBindingContext(context);
+        
+        context.setJmsMsg(requestJMSMsg);
+        context.setJmsResourceFactory(jmsResourceFactory);
+        context.setReplyToDestination(requestJMSMsg.getJMSReplyTo());
+        
+        // set the message body
+        tuscanyMsg.setBody(requestJMSMsg);
+        
+        // call the runtime wire - the response is handled by the 
+        // transport interceptor
+        service.getRuntimeWire(targetBinding).invoke(tuscanyMsg);
+            
+    }   
+
+}