You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/20 06:44:25 UTC

svn commit: r520287 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/queue/ camel-core/src/test/java/org/apache/camel/queue/ camel-jms/src/m...

Author: chirino
Date: Mon Mar 19 22:44:24 2007
New Revision: 520287

URL: http://svn.apache.org/viewvc?view=rev&rev=520287
Log:
Got rid the the activate/deactivate methods on Component since they look better on Endpoint..
but I did add similar methods to the Container.
Also repliated the JmsRouteTest as a QueueRouteTest


Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java Mon Mar 19 22:44:24 2007
@@ -17,15 +17,15 @@
  */
 package org.apache.camel;
 
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultEndpointResolver;
-import org.apache.camel.impl.DefaultExchangeConverter;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultEndpointResolver;
+import org.apache.camel.impl.DefaultExchangeConverter;
+
 /**
  * Represents the container used to configure routes and the policies to use.
  *
@@ -37,19 +37,34 @@
     private EndpointResolver<E> endpointResolver;
     private ExchangeConverter exchangeConverter;
     private Map<String, Component> components = new HashMap<String, Component>();
+	private Map<Endpoint<E>, Processor<E>> routes;
+    
+    /**
+     * Activates all the starting endpoints in that were added as routes.
+     */
+    public void activateEndpoints() {
+        for (Map.Entry<Endpoint<E>, Processor<E>> entry : routes.entrySet()) {
+            Endpoint<E> endpoint = entry.getKey();
+            Processor<E> processor = entry.getValue();
+            endpoint.activate(processor);
+        }
+    }
+    
+    /**
+     * Deactivates all the starting endpoints in that were added as routes.
+     */
+    public void deactivateEndpoints() {
+        for (Endpoint<E> endpoint : routes.keySet()) {
+            endpoint.deactivate();
+        }
+    }
 
     // Builder APIs
     //-----------------------------------------------------------------------
     public void routes(RouteBuilder<E> builder) {
         // lets now add the routes from the builder
         builder.setContainer(this);
-        Map<Endpoint<E>, Processor<E>> routeMap = builder.getRouteMap();
-        Set<Map.Entry<Endpoint<E>, Processor<E>>> entries = routeMap.entrySet();
-        for (Map.Entry<Endpoint<E>, Processor<E>> entry : entries) {
-            Endpoint<E> endpoint = entry.getKey();
-            Processor<E> processor = entry.getValue();
-            endpoint.setInboundProcessor(processor);
-        }
+        routes = builder.getRouteMap();
     }
 
     public void routes(final RouteFactory factory) {
@@ -64,18 +79,18 @@
     /**
      * Adds a component to the container if there is not currently a component already registered.
      */
-    public void addComponent(String componentName, final Component<E, ? extends Endpoint<E>> component) {
+    public void addComponent(String componentName, final Component<E> component) {
         // TODO provide a version of this which barfs if the component is registered multiple times
 
-        getOrCreateComponent(componentName, new Callable<Component<E, ? extends Endpoint<E>>>() {
-            public Component<E, ? extends Endpoint<E>> call() throws Exception {
+        getOrCreateComponent(componentName, new Callable<Component<E>>() {
+            public Component<E> call() throws Exception {
                 return component;
             }
         });
     }
 
 
-    /**
+    /**O
      * Resolves the given URI to an endpoint
      */
     public Endpoint<E> endpoint(String uri) {
@@ -121,7 +136,7 @@
         return new DefaultExchangeConverter();
     }
 
-    public Component getOrCreateComponent(String componentName, Callable<Component<E, ? extends Endpoint<E>>> factory) {
+    public Component getOrCreateComponent(String componentName, Callable<Component<E>> factory) {
         synchronized (components) {
             Component component = components.get(componentName);
             if (component == null) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Mon Mar 19 22:44:24 2007
@@ -21,23 +21,12 @@
  *
  * @version $Revision: 519901 $
  */
-public interface Component<E, EP extends Endpoint<E>>  {
+public interface Component<E>  {
 
     /**
      * The CamelContainer is injected into the component when it is added to it
      */
     void setContainer(CamelContainer container);
 
-    /**
-	 * Asks the component to activate the delivery of {@link Exchange} objects
-	 * from the {@link Endpoint} to the {@link Processor}.
-	 */
-	void activate(EP endpoint, Processor<E> processor);
-
-	/**
-	 * Stops the delivery of messages from a previously activated 
-	 * {@link Endpoint}.
-	 */
-	void deactivate(EP endpoint);
 	
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Mon Mar 19 22:44:24 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.camel;
 
+
 /**
  * Represents an endpoint on which messages can be exchanged
  *
@@ -32,11 +33,6 @@
      * Sends the message exchange to this endpoint
      */
     void send(E exchange);
-
-    /**
-     * Sets the processor for inbound messages
-     */
-    void setInboundProcessor(Processor<E> processor);
     
     /**
      * Create a new exchange for communicating with this endpoint
@@ -45,9 +41,13 @@
 
 
     /**
-     * Called by the container when an endpoint is activiated
+     * Called by the container to Activate the endpoint.  Once activated,
+     * the endpoint will start delivering messages inbound exchanges
+     * it receives to the specified processor.
+     * 
+     * @throws IllegalStateException is the Endpoint has already been activated.
      */
-    void activate();
+	public void activate(Processor<E> processor) throws IllegalStateException;
 
     /**
      * Called by the container when the endpoint is deactivated

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Mar 19 22:44:24 2007
@@ -77,9 +77,12 @@
     }
 
 
-    public void activate() {
+    public void activate(Processor<E> inboundProcessor) {
         if (activated.compareAndSet(false, true)) {
+        	this.inboundProcessor = inboundProcessor;
             doActivate();
+        } else {
+        	throw new IllegalStateException("Endpoint is already active: "+getEndpointUri());
         }
     }
     public void deactivate() {
@@ -97,7 +100,6 @@
 
     public void setInboundProcessor(Processor<E> inboundProcessor) {
         this.inboundProcessor = inboundProcessor;
-        activate();
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java Mon Mar 19 22:44:24 2007
@@ -42,7 +42,6 @@
 		return resolver.resolveEndpoint(container, uri);
     }
 
-
 	public Component resolveComponent(CamelContainer container, String uri) {
     	EndpointResolver resolver = getDelegate(uri);
 		return resolver.resolveComponent(container, uri);

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java Mon Mar 19 22:44:24 2007
@@ -16,14 +16,12 @@
  */
 package org.apache.camel.queue;
 
-import org.apache.camel.CamelContainer;
-import org.apache.camel.Component;
-import org.apache.camel.Processor;
-
 import java.util.HashMap;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
 
 /**
  * Represents the component that manages {@link QueueEndpoint}.  It holds the 
@@ -32,45 +30,17 @@
  * @org.apache.xbean.XBean
  * @version $Revision: 519973 $
  */
-public class QueueComponent<E> implements Component<E, QueueEndpoint<E>> {
+public class QueueComponent<E> implements Component<E> {
 	
-    private HashMap<String, Queue<E>> registry = new HashMap<String, Queue<E>>();
-    private HashMap<QueueEndpoint<E>, Activation> activations = new HashMap<QueueEndpoint<E>, Activation>();
+    private HashMap<String, BlockingQueue<E>> registry = new HashMap<String, BlockingQueue<E>>();
     private CamelContainer container;
 
     public void setContainer(CamelContainer container) {
         this.container = container;
     }
 
-    class Activation implements Runnable {
-		private final QueueEndpoint<E> endpoint;
-		AtomicBoolean stop = new AtomicBoolean();
-		private Thread thread;
-		
-		public Activation(QueueEndpoint<E> endpoint) {
-			this.endpoint = endpoint;
-		}
-
-		public void run() {
-			while(!stop.get()) {
-				
-			}
-		}
-
-		public void start() {
-			thread = new Thread(this, endpoint.getEndpointUri());
-			thread.setDaemon(true);
-			thread.start();
-		}
-
-		public void stop() throws InterruptedException {
-			stop.set(true);
-			thread.join();
-		}
-    }
-
-	synchronized public Queue<E> getOrCreateQueue(String uri) {
-		Queue<E> queue = registry.get(uri);
+	synchronized public BlockingQueue<E> getOrCreateQueue(String uri) {
+		BlockingQueue<E> queue = registry.get(uri);
 		if( queue == null ) {
 			queue = createQueue();
 			registry.put(uri, queue);
@@ -78,30 +48,13 @@
 		return queue;
 	}
 
-	private Queue<E> createQueue() {
+	protected BlockingQueue<E> createQueue() {
 		return new LinkedBlockingQueue<E>();
 	}
 
-	public void activate(QueueEndpoint<E> endpoint, Processor<E> processor) {
-		Activation activation = activations.get(endpoint);
-		if( activation!=null ) {
-			throw new IllegalArgumentException("Endpoint "+endpoint.getEndpointUri()+" has already been activated.");
-		}
-		
-		activation = new Activation(endpoint);
-		activation.start();
+	public CamelContainer getContainer() {
+		return container;
 	}
 
-	public void deactivate(QueueEndpoint<E> endpoint) {
-		Activation activation = activations.remove(endpoint);
-		if( activation==null ) {
-			throw new IllegalArgumentException("Endpoint "+endpoint.getEndpointUri()+" is not activate.");
-		}		
-		try {
-			activation.stop();
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-	}
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java Mon Mar 19 22:44:24 2007
@@ -16,24 +16,28 @@
  */
 package org.apache.camel.queue;
 
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.camel.CamelContainer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 
-import java.util.Queue;
-
 /**
- * Represents a queue endpoint that uses a {@link Queue}
+ * Represents a queue endpoint that uses a {@link BlockingQueue}
  * object to process inbound exchanges.
  *
  * @org.apache.xbean.XBean
  * @version $Revision: 519973 $
  */
 public class QueueEndpoint<E> extends DefaultEndpoint<E> {
-    private Queue<E> queue;
+    private BlockingQueue<E> queue;
+	private org.apache.camel.queue.QueueEndpoint.Activation activation;
 
-    public QueueEndpoint(String uri, CamelContainer container, Queue<E> queue) {
+    public QueueEndpoint(String uri, CamelContainer container, BlockingQueue<E> queue) {
         super(uri, container);
         this.queue = queue;
     }
@@ -55,5 +59,60 @@
 
     public Queue<E> getQueue() {
         return queue;
+    }
+    
+    class Activation implements Runnable {
+		AtomicBoolean stop = new AtomicBoolean();
+		private Thread thread;
+		
+		public void run() {
+			while(!stop.get()) {
+				E exchange=null;
+				try {
+					exchange = queue.poll(100, TimeUnit.MILLISECONDS);
+				} catch (InterruptedException e) {
+					break;
+				}
+				if( exchange !=null ) {
+					try {
+						getInboundProcessor().onExchange(exchange);
+					} catch (Throwable e) {
+						e.printStackTrace();
+					}
+				}
+			}
+		}
+
+		public void start() {
+			thread = new Thread(this, getEndpointUri());
+			thread.setDaemon(true);
+			thread.start();
+		}
+
+		public void stop() throws InterruptedException {
+			stop.set(true);
+			thread.join();
+		}
+		
+		@Override
+		public String toString() {
+			return "Activation: "+getEndpointUri();
+		}
+    }
+
+    @Override
+    protected void doActivate() {
+		activation = new Activation();
+		activation.start();
+    }
+    
+    @Override
+    protected void doDeactivate() {
+		try {
+			activation.stop();
+			activation=null;
+		} catch (InterruptedException e) {
+			throw new RuntimeException(e);
+		}
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java Mon Mar 19 22:44:24 2007
@@ -16,7 +16,7 @@
  */
 package org.apache.camel.queue;
 
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 
 import org.apache.camel.CamelContainer;
@@ -61,7 +61,7 @@
 	public Endpoint<E> resolveEndpoint(CamelContainer container, String uri) {
 		String id[] = getEndpointId(uri);        
     	QueueComponent<E> component = resolveQueueComponent(container, id[0]);  
-        Queue<E> queue = component.getOrCreateQueue(id[1]);
+    	BlockingQueue<E> queue = component.getOrCreateQueue(id[1]);
 		return new QueueEndpoint<E>(uri, container, queue);
     }
 
@@ -82,8 +82,8 @@
 	
 	@SuppressWarnings("unchecked")
 	private QueueComponent<E> resolveQueueComponent(CamelContainer container, String componentName) {
-    	Component rc = container.getOrCreateComponent(componentName, new Callable<Component<E,? extends Endpoint<E>>>(){
-			public Component<E, ? extends Endpoint<E>> call() throws Exception {
+    	Component rc = container.getOrCreateComponent(componentName, new Callable<Component<E>>(){
+			public Component<E> call() throws Exception {
 				return new QueueComponent<E>();
 			}});
     	return (QueueComponent<E>) rc;

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java?view=auto&rev=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java Mon Mar 19 22:44:24 2007
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.queue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultExchange;
+
+/**
+ * @version $Revision: 520220 $
+ */
+public class QueueRouteTest extends TestCase {
+	
+	static class StringExchange extends DefaultExchange<String, String, String> {		
+	}
+	
+    public void testJmsRoute() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        CamelContainer container = new CamelContainer();
+
+        // lets add some routes
+        container.routes(new RouteBuilder() {
+            public void configure() {
+                from("queue:test.a").to("queue:test.b");
+                from("queue:test.b").process(new Processor<StringExchange>() {
+                    public void onExchange(StringExchange exchange) {
+                        System.out.println("Received exchange: " + exchange.getRequest());
+                        latch.countDown();
+                    }
+                });
+            }
+        });
+
+        
+        container.activateEndpoints();
+        
+        // now lets fire in a message
+        Endpoint<StringExchange> endpoint = container.endpoint("queue:test.a");
+        StringExchange exchange = new StringExchange();
+        exchange.setHeader("cheese", 123);
+        endpoint.send(exchange);
+
+        // now lets sleep for a while
+        boolean received = latch.await(5, TimeUnit.SECONDS);
+        assertTrue("Did not recieve the message!", received);
+
+        container.deactivateEndpoints();
+    }
+}

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 22:44:24 2007
@@ -32,7 +32,7 @@
 /**
  * @version $Revision$
  */
-public class JmsComponent implements Component<JmsExchange, JmsEndpoint> {
+public class JmsComponent implements Component<JmsExchange> {
     public static final String QUEUE_PREFIX = "queue/";
     public static final String TOPIC_PREFIX = "topic/";
 

Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 22:44:24 2007
@@ -55,6 +55,9 @@
             }
         });
 
+        
+        container.activateEndpoints();
+        
         // now lets fire in a message
         Endpoint<JmsExchange> endpoint = container.endpoint("jms:activemq:test.a");
         JmsExchange exchange2 = endpoint.createExchange();
@@ -66,7 +69,6 @@
         boolean received = latch.await(5, TimeUnit.SECONDS);
         assertTrue("Did not recieve the message!", received);
 
-        // TODO
-        //container.stop();
+        container.deactivateEndpoints();
     }
 }