You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/03/10 20:51:09 UTC

svn commit: r752231 - in /servicemix/smx4/nmr/trunk/jbi/deployer/src: main/java/org/apache/servicemix/jbi/deployer/artifacts/ main/java/org/apache/servicemix/jbi/deployer/impl/ main/resources/META-INF/spring/ test/java/org/apache/servicemix/jbi/deploye...

Author: gertv
Date: Tue Mar 10 19:51:08 2009
New Revision: 752231

URL: http://svn.apache.org/viewvc?rev=752231&view=rev
Log:
SMX4NMR-118: Add a shutdown monitor thread to the SA with a configurable timeout

Added:
    servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java   (with props)
Modified:
    servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java
    servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java
    servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java
    servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml

Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java Tue Mar 10 19:51:08 2009
@@ -16,11 +16,15 @@
  */
 package org.apache.servicemix.jbi.deployer.artifacts;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicemix.jbi.deployer.ServiceAssembly;
+import org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl;
+import org.apache.servicemix.nmr.api.Endpoint;
 import org.apache.servicemix.nmr.api.Exchange;
 import org.apache.servicemix.nmr.api.Role;
 import org.apache.servicemix.nmr.api.Status;
@@ -43,6 +47,7 @@
     private final ConcurrentMap<InternalEndpoint, ServiceAssembly> endpoints = new ConcurrentHashMap<InternalEndpoint, ServiceAssembly>();
     private final ConcurrentMap<ServiceAssembly, AtomicInteger> references = new ConcurrentHashMap<ServiceAssembly, AtomicInteger>();
     private final ConcurrentMap<ServiceAssembly, Object> locks = new ConcurrentHashMap<ServiceAssembly, Object>();
+    private final ConcurrentMap<InternalExchange, ServiceAssembly> pending = new ConcurrentHashMap<InternalExchange, ServiceAssembly>();
 
     public void setAssembly(ServiceAssembly assembly) {
         this.assembly.set(assembly);
@@ -89,10 +94,22 @@
                 // Increment reference to the source SA
                 InternalExchange ie = (InternalExchange) exchange;
                 reference(ie.getSource());
+                if (isSync(exchange)) {
+                    pending(ie);
+                }
             }
         }
     }
 
+    private boolean isSync(Exchange exchange) {
+        return exchange.getProperty(DeliveryChannelImpl.SEND_SYNC) != null && exchange.getProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.class).booleanValue();
+    }
+
+    private void pending(InternalExchange exchange) {
+        ServiceAssembly assembly = endpoints.get(exchange.getSource());
+        pending.put(exchange, assembly);
+    }
+
     public void exchangeDelivered(Exchange exchange) {
         // Check if the exchange is finished
         if (exchange.getStatus() != Status.Active) {
@@ -101,6 +118,7 @@
                 // Decrement references to source and destination SA
                 unreference(ie.getSource());
                 unreference(ie.getDestination());
+                pending.remove(exchange);
             }
             // Check if this is a new exchange
         } else if (exchange.getStatus() == Status.Active && exchange.getRole() == Role.Provider &&
@@ -119,6 +137,7 @@
             // Decrement references to source and destination SA
             unreference(ie.getSource());
             unreference(ie.getDestination());
+            pending.remove(exchange);
         }
     }
 
@@ -137,6 +156,14 @@
             }
         }
     }
+    
+    public void cancelPendingSyncExchanges(ServiceAssembly assembly) {
+        if (assembly != null) {
+            for (Exchange exchange : getPending(assembly)) {
+                exchange.cancel();
+            }
+        }
+    }
 
     private void reference(InternalEndpoint endpoint) {
         if (endpoint != null) {
@@ -173,4 +200,14 @@
         }
     }
 
+    protected Set<InternalExchange> getPending(ServiceAssembly assembly) {
+        Set<InternalExchange> result = new HashSet<InternalExchange>();
+        for (InternalExchange exchange : pending.keySet()) {
+            if (pending.get(exchange).equals(assembly)) {
+                result.add(exchange);
+            }
+        }
+        return result;
+    }
+
 }

Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java Tue Mar 10 19:51:08 2009
@@ -21,6 +21,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
@@ -76,6 +78,8 @@
     // map of wires and the matching OSGi ServiceRegistration
     private Map<Wire, ServiceRegistration> wires = new HashMap<Wire, ServiceRegistration>();
 
+    private int shutdownTimeout;
+
     public ServiceAssemblyImpl(Bundle bundle,
                                ServiceAssemblyDesc serviceAssemblyDesc,
                                List<ServiceUnitImpl> serviceUnits,
@@ -117,6 +121,10 @@
     public List<ServiceUnitImpl> getServiceUnitsList() {
         return serviceUnits;
     }
+    
+    public void setShutdownTimeout(int shutdownTimeout) {
+        this.shutdownTimeout = shutdownTimeout;
+    }
 
     public synchronized void init() throws JBIException {
         checkComponentsStarted();
@@ -200,6 +208,7 @@
 
     public synchronized void shutDown(boolean persist, boolean force) throws JBIException {
         listener.setAssembly(this);
+        final Semaphore semaphore = force && shutdownTimeout > 0 ? startShutdownMonitorThread() : null;
         try {
             if (state == State.Shutdown) {
                 return;
@@ -225,6 +234,11 @@
         } finally {
             listener.setAssembly(null);
             listener.forget(this);
+            
+            //notify the shutdown monitor thread that things ended correctly
+            if (semaphore != null) {
+                semaphore.release();
+            }
         }
     }
 
@@ -303,4 +317,28 @@
         return bundle.getBundleContext().registerService(Wire.class.getName(), 
                                                          wire, new MapToDictionary(wire.getFrom()));
     }
+    
+    /*
+     * Start the shutdown monitor thread and return a semaphore to notify the thread of a clean shutdown
+     */
+    private Semaphore startShutdownMonitorThread() {
+        final Semaphore semaphore = new Semaphore(0);
+        Thread thread = new Thread(getName()  + " - Shutdown Monitor Thread") {
+            @Override
+            public void run() {
+                try {
+                    LOGGER.debug("Waiting for " + shutdownTimeout + " milliseconds to a clean shutdown of SA " + ServiceAssemblyImpl.this.getName());
+                    if (!semaphore.tryAcquire(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+                        LOGGER.warn("Unable to do a clean shutdown of SA " + ServiceAssemblyImpl.this.getName() + ", canceling all sync exchanges");
+                        listener.cancelPendingSyncExchanges(ServiceAssemblyImpl.this);                        
+                    }
+                } catch (InterruptedException e) {
+                    //let's assume things went OK if we got interrupted
+                }
+            }
+        };
+        thread.setDaemon(true);
+        thread.start();
+        return semaphore;
+    }
 }

Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java Tue Mar 10 19:51:08 2009
@@ -112,6 +112,8 @@
     private ServiceTracker deployedAssembliesTracker;
 
     private AssemblyReferencesListener endpointListener;
+    
+    private int shutdownTimeout;
 
     // Helper beans
     private NamingStrategy namingStrategy;
@@ -216,6 +218,15 @@
     public ServiceAssemblyImpl getServiceAssembly(String name) {
         return name != null ? serviceAssemblies.get(name) : null;
     }
+    
+    public void setShutdownTimeout(int shutdownTimeout) {
+        System.out.println("Shutting down in " + shutdownTimeout);
+        this.shutdownTimeout = shutdownTimeout;
+    }
+    
+    public int getShutdownTimeout() {
+        return shutdownTimeout;
+    }
 
     public void afterPropertiesSet() throws Exception {
         // Track bundles
@@ -436,6 +447,7 @@
         // Now create the SA and initialize it
         Preferences prefs = preferencesService.getUserPreferences(serviceAssemblyDesc.getIdentification().getName());
         ServiceAssemblyImpl sa = new ServiceAssemblyImpl(bundle, serviceAssemblyDesc, sus, prefs, endpointListener, autoStart);
+        sa.setShutdownTimeout(shutdownTimeout);
         sa.setListenerRegistry(listenerRegistry);
         sa.init();
         serviceAssemblies.put(sa.getName(), sa);

Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml Tue Mar 10 19:51:08 2009
@@ -18,16 +18,22 @@
 
 -->
 <beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:ctx="http://www.springframework.org/schema/context"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:osgi="http://www.springframework.org/schema/osgi"
+       xmlns:osgix="http://www.springframework.org/schema/osgi-compendium"
        xmlns:util="http://www.springframework.org/schema/util"
        xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans.xsd
+  http://www.springframework.org/schema/context
+  http://www.springframework.org/schema/context/spring-context.xsd
   http://www.springframework.org/schema/util
   http://www.springframework.org/schema/util/spring-util.xsd
   http://www.springframework.org/schema/osgi
-  http://www.springframework.org/schema/osgi/spring-osgi.xsd">
+  http://www.springframework.org/schema/osgi/spring-osgi.xsd
+  http://www.springframework.org/schema/osgi-compendium
+  http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd">
 
     <!-- JBI Deployer -->
     <bean id="deployer" class="org.apache.servicemix.jbi.deployer.impl.Deployer">
@@ -37,6 +43,7 @@
         <property name="namingStrategy" ref="namingStrategy" />
         <property name="managementAgent" ref="managementAgent" />
         <property name="preferencesService" ref="preferencesService" />
+        <property name="shutdownTimeout" value="${shutdownTimeout}" />
     </bean>
 
     <!-- Endpoint listener -->
@@ -118,5 +125,11 @@
     <!-- Preferences Service -->
     <osgi:reference id="preferencesService"
                     interface="org.osgi.service.prefs.PreferencesService" />
+                    
+    <osgix:cm-properties id="cmProps" persistent-id="org.apache.servicemix.jbi">
+        <prop key="shutdownTimeout">0</prop>
+    </osgix:cm-properties>
+
+    <ctx:property-placeholder properties-ref="cmProps" />
 
 </beans>

Added: servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java?rev=752231&view=auto
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java (added)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java Tue Mar 10 19:51:08 2009
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.deployer.artifacts;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.util.Collections;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.jbi.deployer.artifacts.AbstractLifecycleJbiArtifact.State;
+import org.apache.servicemix.jbi.deployer.descriptor.DescriptorFactory;
+import org.apache.servicemix.jbi.deployer.descriptor.ServiceAssemblyDesc;
+import org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl;
+import org.apache.servicemix.jbi.runtime.impl.EndpointImpl;
+import org.apache.servicemix.nmr.api.Endpoint;
+import org.apache.servicemix.nmr.api.Pattern;
+import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.api.internal.InternalEndpoint;
+import org.apache.servicemix.nmr.api.internal.InternalExchange;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.nmr.core.ExchangeImpl;
+import org.apache.servicemix.nmr.core.InternalEndpointWrapper;
+import org.osgi.service.prefs.Preferences;
+
+/**
+ * Test cases for {@link AssemblyReferencesListener}
+ */
+public class AssemblyReferencesListenerTest extends TestCase {
+
+    public void testKeepTrackOfSyncExchangesWhenDone() throws Exception {
+        ServiceAssemblyImpl sa = createServiceAssembly();
+        
+        AssemblyReferencesListener listener = new AssemblyReferencesListener();
+        listener.setAssembly(sa);
+        
+        InternalEndpoint endpoint = new InternalEndpointWrapper(new EndpointImpl(ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "endpoint")),
+                                                                ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "internal-endpoint"));
+        listener.endpointRegistered(endpoint);
+        listener.setAssembly(null);
+        
+        InternalExchange exchange = new ExchangeImpl(Pattern.InOnly);
+        exchange.setSource(endpoint);
+        exchange.setProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.TRUE);
+        listener.exchangeSent(exchange);
+        
+        assertEquals(1, listener.getPending(sa).size());
+        
+        exchange.setStatus(Status.Done);
+        listener.exchangeDelivered(exchange);
+        
+        assertEquals(0, listener.getPending(sa).size());
+    }
+    
+    public void testKeepTrackOfSyncExchangesWhenFailed() throws Exception {
+        ServiceAssemblyImpl sa = createServiceAssembly();
+        
+        AssemblyReferencesListener listener = new AssemblyReferencesListener();
+        listener.setAssembly(sa);
+        
+        InternalEndpoint endpoint = new InternalEndpointWrapper(new EndpointImpl(ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "endpoint")),
+                                                                ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "internal-endpoint"));
+        listener.endpointRegistered(endpoint);
+        listener.setAssembly(null);
+        
+        InternalExchange exchange = new ExchangeImpl(Pattern.InOnly);
+        exchange.setSource(endpoint);
+        exchange.setProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.TRUE);
+        listener.exchangeSent(exchange);
+        
+        assertEquals(1, listener.getPending(sa).size());
+        
+        exchange.setStatus(Status.Error);
+        listener.exchangeFailed(exchange);
+        
+        assertEquals(0, listener.getPending(sa).size());
+    }
+    
+    public void testCancelPendingExchanges() throws Exception {
+        ServiceAssemblyImpl sa = createServiceAssembly();
+        
+        AssemblyReferencesListener listener = new AssemblyReferencesListener();
+        listener.setAssembly(sa);
+        
+        InternalEndpoint endpoint = new InternalEndpointWrapper(new EndpointImpl(ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "endpoint")),
+                                                                ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "internal-endpoint"));
+        listener.endpointRegistered(endpoint);
+        listener.setAssembly(null);
+        
+        InternalExchange exchange = new ExchangeImpl(Pattern.InOnly);
+        exchange.setSource(endpoint);
+        exchange.setProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.TRUE);
+        exchange.getConsumerLock(true);
+        listener.exchangeSent(exchange);
+        
+        assertEquals(1, listener.getPending(sa).size());
+        
+        listener.cancelPendingSyncExchanges(sa);
+        assertEquals(Status.Error, exchange.getStatus());
+    }
+
+    private ServiceAssemblyImpl createServiceAssembly() {
+        ServiceAssemblyDesc descriptor = DescriptorFactory.buildDescriptor(DescriptorFactory.class.getResource("serviceAssembly.xml")).getServiceAssembly();
+        final Preferences prefs = createMock(Preferences.class);
+        expect(prefs.get("state", State.Shutdown.name())).andReturn(State.Shutdown.name()).anyTimes();
+        replay(prefs);
+
+        ComponentImpl comp = new ComponentImpl(null, null, null, prefs, false, null);
+        ServiceUnitImpl su = new ServiceUnitImpl(descriptor.getServiceUnits()[0], null, comp);
+        ServiceAssemblyImpl sa = new ServiceAssemblyImpl(null, descriptor, Collections.singletonList(su), prefs, new AssemblyReferencesListener(), false);
+        return sa;
+    }
+}

Propchange: servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native