You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/02/11 18:07:46 UTC

svn commit: r743405 - in /servicemix/smx4/nmr/trunk/nmr: api/src/main/java/org/apache/servicemix/nmr/api/service/ core/src/main/java/org/apache/servicemix/nmr/core/

Author: gnodet
Date: Wed Feb 11 17:07:45 2009
New Revision: 743405

URL: http://svn.apache.org/viewvc?rev=743405&view=rev
Log:
SMX4NMR-73: Throw an exception when sending a message to a closed channel

Added:
    servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java
Modified:
    servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java
    servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
    servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java

Modified: servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java?rev=743405&r1=743404&r2=743405&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java Wed Feb 11 17:07:45 2009
@@ -27,8 +27,8 @@
     private ServiceHelper() {
     }
 
-    public static Map<String, ?> createMap(String... data) {
-        Map<String, String> props = new HashMap<String, String>();
+    public static Map<String, Object> createMap(String... data) {
+        Map<String, Object> props = new HashMap<String, Object>();
         for (int i = 0; i < data.length / 2; i++) {
             props.put(data[i*2], data[i*2+1]);
         }

Added: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java?rev=743405&view=auto
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java (added)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java Wed Feb 11 17:07:45 2009
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.nmr.core;
+
+/**
+ * Exception thrown when using a closed channel.
+ */
+public class ChannelClosedException extends NmrRuntimeException {
+
+    public ChannelClosedException() {
+    }
+
+    public ChannelClosedException(String message) {
+        super(message);
+    }
+
+    public ChannelClosedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ChannelClosedException(Throwable cause) {
+        super(cause);
+    }
+}

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java?rev=743405&r1=743404&r2=743405&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java Wed Feb 11 17:07:45 2009
@@ -21,6 +21,9 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,7 +42,7 @@
 /**
  * The {@link org.apache.servicemix.nmr.api.Channel} implementation.
  * The channel uses an Executor (usually a thread pool)
- * to delegate
+ * to delegate task executions to.
  *
  * @version $Revision: $
  * @since 4.0
@@ -49,11 +52,12 @@
     private static final Log LOG = LogFactory.getLog(NMR.class);
 
     private final InternalEndpoint endpoint;
-    private final Executor executor;
+    private final ExecutorService executor;
     private final NMR nmr;
     private String name;
+    private AtomicBoolean closed = new AtomicBoolean();
 
-    public ChannelImpl(InternalEndpoint endpoint, Executor executor, NMR nmr) {
+    public ChannelImpl(InternalEndpoint endpoint, ExecutorService executor, NMR nmr) {
         this.endpoint = endpoint;
         this.executor = executor;
         this.nmr = nmr;
@@ -153,8 +157,11 @@
      * the NMR.
      */
     public void close() {
-        Map<String,?> props = nmr.getEndpointRegistry().getProperties(endpoint);
-        nmr.getEndpointRegistry().unregister(endpoint, props);
+        if (closed.compareAndSet(false, true)) {
+            Map<String,?> props = nmr.getEndpointRegistry().getProperties(endpoint);
+            nmr.getEndpointRegistry().unregister(endpoint, props);
+            executor.shutdown();
+        }
     }
 
     /**
@@ -163,6 +170,9 @@
      * @param exchange the exchange to delivery
      */
     public void deliver(final InternalExchange exchange) {
+        if (closed.get()) {
+            throw new ChannelClosedException();
+        }
         // Log the exchange
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel " + name + " delivering exchange: " + exchange.display(true));
@@ -177,11 +187,19 @@
             return;
         }
         // Delegate processing to the executor
-        this.executor.execute(new Runnable() {
-            public void run() {
-                process(exchange);
+        try {
+            this.executor.execute(new Runnable() {
+                public void run() {
+                    process(exchange);
+                }
+            });
+        } catch (RejectedExecutionException e) {
+            if (closed.get()) {
+                throw new ChannelClosedException();
+            } else {
+                throw e;
             }
-        });
+        }
     }
 
     /**
@@ -217,6 +235,9 @@
      * @param exchange the exchange to dispatch
      */
     protected void dispatch(InternalExchange exchange) {
+        if (closed.get()) {
+            throw new ChannelClosedException();
+        }
         // Log the exchange
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel " + name + " dispatching exchange: " + exchange.display(true));

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java?rev=743405&r1=743404&r2=743405&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java Wed Feb 11 17:07:45 2009
@@ -25,6 +25,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import org.w3c.dom.Document;
 
@@ -99,7 +100,7 @@
     public void register(Endpoint endpoint, Map<String, ?> properties) {
         InternalEndpointWrapper wrapper = new InternalEndpointWrapper(endpoint, properties);
         if (endpoints.putIfAbsent(endpoint, wrapper) == null) {
-            Executor executor = Executors.newCachedThreadPool();
+            ExecutorService executor = Executors.newCachedThreadPool();
             ChannelImpl channel = new ChannelImpl(wrapper, executor, nmr);
             wrapper.setChannel(channel);
             wrappers.put(wrapper, endpoint);
@@ -138,6 +139,7 @@
             }
         }
         if (wrapper != null) {
+            wrapper.getChannel().close();
             registry.unregister(wrapper, properties);
             for (EndpointListener listener : nmr.getListenerRegistry().getListeners(EndpointListener.class)) {
                 listener.endpointUnregistered(wrapper);