You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/02/11 18:07:46 UTC
svn commit: r743405 - in /servicemix/smx4/nmr/trunk/nmr:
api/src/main/java/org/apache/servicemix/nmr/api/service/
core/src/main/java/org/apache/servicemix/nmr/core/
Author: gnodet
Date: Wed Feb 11 17:07:45 2009
New Revision: 743405
URL: http://svn.apache.org/viewvc?rev=743405&view=rev
Log:
SMX4NMR-73: Throw an exception when sending a message to a closed channel
Added:
servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java
Modified:
servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java
servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java
Modified: servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java?rev=743405&r1=743404&r2=743405&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/api/src/main/java/org/apache/servicemix/nmr/api/service/ServiceHelper.java Wed Feb 11 17:07:45 2009
@@ -27,8 +27,8 @@
private ServiceHelper() {
}
- public static Map<String, ?> createMap(String... data) {
- Map<String, String> props = new HashMap<String, String>();
+ public static Map<String, Object> createMap(String... data) {
+ Map<String, Object> props = new HashMap<String, Object>();
for (int i = 0; i < data.length / 2; i++) {
props.put(data[i*2], data[i*2+1]);
}
Added: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java?rev=743405&view=auto
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java (added)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelClosedException.java Wed Feb 11 17:07:45 2009
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.nmr.core;
+
+/**
+ * Exception thrown when using a closed channel.
+ */
+public class ChannelClosedException extends NmrRuntimeException {
+
+ public ChannelClosedException() {
+ }
+
+ public ChannelClosedException(String message) {
+ super(message);
+ }
+
+ public ChannelClosedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ChannelClosedException(Throwable cause) {
+ super(cause);
+ }
+}
Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java?rev=743405&r1=743404&r2=743405&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java Wed Feb 11 17:07:45 2009
@@ -21,6 +21,9 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,7 +42,7 @@
/**
* The {@link org.apache.servicemix.nmr.api.Channel} implementation.
* The channel uses an Executor (usually a thread pool)
- * to delegate
+ * to delegate task executions to.
*
* @version $Revision: $
* @since 4.0
@@ -49,11 +52,12 @@
private static final Log LOG = LogFactory.getLog(NMR.class);
private final InternalEndpoint endpoint;
- private final Executor executor;
+ private final ExecutorService executor;
private final NMR nmr;
private String name;
+ private AtomicBoolean closed = new AtomicBoolean();
- public ChannelImpl(InternalEndpoint endpoint, Executor executor, NMR nmr) {
+ public ChannelImpl(InternalEndpoint endpoint, ExecutorService executor, NMR nmr) {
this.endpoint = endpoint;
this.executor = executor;
this.nmr = nmr;
@@ -153,8 +157,11 @@
* the NMR.
*/
public void close() {
- Map<String,?> props = nmr.getEndpointRegistry().getProperties(endpoint);
- nmr.getEndpointRegistry().unregister(endpoint, props);
+ if (closed.compareAndSet(false, true)) {
+ Map<String,?> props = nmr.getEndpointRegistry().getProperties(endpoint);
+ nmr.getEndpointRegistry().unregister(endpoint, props);
+ executor.shutdown();
+ }
}
/**
@@ -163,6 +170,9 @@
* @param exchange the exchange to delivery
*/
public void deliver(final InternalExchange exchange) {
+ if (closed.get()) {
+ throw new ChannelClosedException();
+ }
// Log the exchange
if (LOG.isTraceEnabled()) {
LOG.trace("Channel " + name + " delivering exchange: " + exchange.display(true));
@@ -177,11 +187,19 @@
return;
}
// Delegate processing to the executor
- this.executor.execute(new Runnable() {
- public void run() {
- process(exchange);
+ try {
+ this.executor.execute(new Runnable() {
+ public void run() {
+ process(exchange);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ if (closed.get()) {
+ throw new ChannelClosedException();
+ } else {
+ throw e;
}
- });
+ }
}
/**
@@ -217,6 +235,9 @@
* @param exchange the exchange to dispatch
*/
protected void dispatch(InternalExchange exchange) {
+ if (closed.get()) {
+ throw new ChannelClosedException();
+ }
// Log the exchange
if (LOG.isTraceEnabled()) {
LOG.trace("Channel " + name + " dispatching exchange: " + exchange.display(true));
Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java?rev=743405&r1=743404&r2=743405&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/EndpointRegistryImpl.java Wed Feb 11 17:07:45 2009
@@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import org.w3c.dom.Document;
@@ -99,7 +100,7 @@
public void register(Endpoint endpoint, Map<String, ?> properties) {
InternalEndpointWrapper wrapper = new InternalEndpointWrapper(endpoint, properties);
if (endpoints.putIfAbsent(endpoint, wrapper) == null) {
- Executor executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors.newCachedThreadPool();
ChannelImpl channel = new ChannelImpl(wrapper, executor, nmr);
wrapper.setChannel(channel);
wrappers.put(wrapper, endpoint);
@@ -138,6 +139,7 @@
}
}
if (wrapper != null) {
+ wrapper.getChannel().close();
registry.unregister(wrapper, properties);
for (EndpointListener listener : nmr.getListenerRegistry().getListeners(EndpointListener.class)) {
listener.endpointUnregistered(wrapper);