You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/03/10 20:51:09 UTC
svn commit: r752231 - in /servicemix/smx4/nmr/trunk/jbi/deployer/src:
main/java/org/apache/servicemix/jbi/deployer/artifacts/
main/java/org/apache/servicemix/jbi/deployer/impl/
main/resources/META-INF/spring/
test/java/org/apache/servicemix/jbi/deploye...
Author: gertv
Date: Tue Mar 10 19:51:08 2009
New Revision: 752231
URL: http://svn.apache.org/viewvc?rev=752231&view=rev
Log:
SMX4NMR-118: Add a shutdown monitor thread to the SA with a configurable timeout
Added:
servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java (with props)
Modified:
servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java
servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java
servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java
servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml
Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListener.java Tue Mar 10 19:51:08 2009
@@ -16,11 +16,15 @@
*/
package org.apache.servicemix.jbi.deployer.artifacts;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.servicemix.jbi.deployer.ServiceAssembly;
+import org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl;
+import org.apache.servicemix.nmr.api.Endpoint;
import org.apache.servicemix.nmr.api.Exchange;
import org.apache.servicemix.nmr.api.Role;
import org.apache.servicemix.nmr.api.Status;
@@ -43,6 +47,7 @@
private final ConcurrentMap<InternalEndpoint, ServiceAssembly> endpoints = new ConcurrentHashMap<InternalEndpoint, ServiceAssembly>();
private final ConcurrentMap<ServiceAssembly, AtomicInteger> references = new ConcurrentHashMap<ServiceAssembly, AtomicInteger>();
private final ConcurrentMap<ServiceAssembly, Object> locks = new ConcurrentHashMap<ServiceAssembly, Object>();
+ private final ConcurrentMap<InternalExchange, ServiceAssembly> pending = new ConcurrentHashMap<InternalExchange, ServiceAssembly>();
public void setAssembly(ServiceAssembly assembly) {
this.assembly.set(assembly);
@@ -89,10 +94,22 @@
// Increment reference to the source SA
InternalExchange ie = (InternalExchange) exchange;
reference(ie.getSource());
+ if (isSync(exchange)) {
+ pending(ie);
+ }
}
}
}
+ private boolean isSync(Exchange exchange) {
+ return exchange.getProperty(DeliveryChannelImpl.SEND_SYNC) != null && exchange.getProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.class).booleanValue();
+ }
+
+ private void pending(InternalExchange exchange) {
+ ServiceAssembly assembly = endpoints.get(exchange.getSource());
+ pending.put(exchange, assembly);
+ }
+
public void exchangeDelivered(Exchange exchange) {
// Check if the exchange is finished
if (exchange.getStatus() != Status.Active) {
@@ -101,6 +118,7 @@
// Decrement references to source and destination SA
unreference(ie.getSource());
unreference(ie.getDestination());
+ pending.remove(exchange);
}
// Check if this is a new exchange
} else if (exchange.getStatus() == Status.Active && exchange.getRole() == Role.Provider &&
@@ -119,6 +137,7 @@
// Decrement references to source and destination SA
unreference(ie.getSource());
unreference(ie.getDestination());
+ pending.remove(exchange);
}
}
@@ -137,6 +156,14 @@
}
}
}
+
+ public void cancelPendingSyncExchanges(ServiceAssembly assembly) {
+ if (assembly != null) {
+ for (Exchange exchange : getPending(assembly)) {
+ exchange.cancel();
+ }
+ }
+ }
private void reference(InternalEndpoint endpoint) {
if (endpoint != null) {
@@ -173,4 +200,14 @@
}
}
+ protected Set<InternalExchange> getPending(ServiceAssembly assembly) {
+ Set<InternalExchange> result = new HashSet<InternalExchange>();
+ for (InternalExchange exchange : pending.keySet()) {
+ if (pending.get(exchange).equals(assembly)) {
+ result.add(exchange);
+ }
+ }
+ return result;
+ }
+
}
Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/artifacts/ServiceAssemblyImpl.java Tue Mar 10 19:51:08 2009
@@ -21,6 +21,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
@@ -76,6 +78,8 @@
// map of wires and the matching OSGi ServiceRegistration
private Map<Wire, ServiceRegistration> wires = new HashMap<Wire, ServiceRegistration>();
+ private int shutdownTimeout;
+
public ServiceAssemblyImpl(Bundle bundle,
ServiceAssemblyDesc serviceAssemblyDesc,
List<ServiceUnitImpl> serviceUnits,
@@ -117,6 +121,10 @@
public List<ServiceUnitImpl> getServiceUnitsList() {
return serviceUnits;
}
+
+ public void setShutdownTimeout(int shutdownTimeout) {
+ this.shutdownTimeout = shutdownTimeout;
+ }
public synchronized void init() throws JBIException {
checkComponentsStarted();
@@ -200,6 +208,7 @@
public synchronized void shutDown(boolean persist, boolean force) throws JBIException {
listener.setAssembly(this);
+ final Semaphore semaphore = force && shutdownTimeout > 0 ? startShutdownMonitorThread() : null;
try {
if (state == State.Shutdown) {
return;
@@ -225,6 +234,11 @@
} finally {
listener.setAssembly(null);
listener.forget(this);
+
+ //notify the shutdown monitor thread that things ended correctly
+ if (semaphore != null) {
+ semaphore.release();
+ }
}
}
@@ -303,4 +317,28 @@
return bundle.getBundleContext().registerService(Wire.class.getName(),
wire, new MapToDictionary(wire.getFrom()));
}
+
+ /*
+ * Start the shutdown monitor thread and return a semaphore to notify the thread of a clean shutdown
+ */
+ private Semaphore startShutdownMonitorThread() {
+ final Semaphore semaphore = new Semaphore(0);
+ Thread thread = new Thread(getName() + " - Shutdown Monitor Thread") {
+ @Override
+ public void run() {
+ try {
+ LOGGER.debug("Waiting for " + shutdownTimeout + " milliseconds to a clean shutdown of SA " + ServiceAssemblyImpl.this.getName());
+ if (!semaphore.tryAcquire(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Unable to do a clean shutdown of SA " + ServiceAssemblyImpl.this.getName() + ", canceling all sync exchanges");
+ listener.cancelPendingSyncExchanges(ServiceAssemblyImpl.this);
+ }
+ } catch (InterruptedException e) {
+ //let's assume things went OK if we got interrupted
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ return semaphore;
+ }
}
Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/java/org/apache/servicemix/jbi/deployer/impl/Deployer.java Tue Mar 10 19:51:08 2009
@@ -112,6 +112,8 @@
private ServiceTracker deployedAssembliesTracker;
private AssemblyReferencesListener endpointListener;
+
+ private int shutdownTimeout;
// Helper beans
private NamingStrategy namingStrategy;
@@ -216,6 +218,15 @@
public ServiceAssemblyImpl getServiceAssembly(String name) {
return name != null ? serviceAssemblies.get(name) : null;
}
+
+ public void setShutdownTimeout(int shutdownTimeout) {
+ System.out.println("Shutting down in " + shutdownTimeout);
+ this.shutdownTimeout = shutdownTimeout;
+ }
+
+ public int getShutdownTimeout() {
+ return shutdownTimeout;
+ }
public void afterPropertiesSet() throws Exception {
// Track bundles
@@ -436,6 +447,7 @@
// Now create the SA and initialize it
Preferences prefs = preferencesService.getUserPreferences(serviceAssemblyDesc.getIdentification().getName());
ServiceAssemblyImpl sa = new ServiceAssemblyImpl(bundle, serviceAssemblyDesc, sus, prefs, endpointListener, autoStart);
+ sa.setShutdownTimeout(shutdownTimeout);
sa.setListenerRegistry(listenerRegistry);
sa.init();
serviceAssemblies.put(sa.getName(), sa);
Modified: servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml?rev=752231&r1=752230&r2=752231&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml (original)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/main/resources/META-INF/spring/servicemix-jbi-deployer.xml Tue Mar 10 19:51:08 2009
@@ -18,16 +18,22 @@
-->
<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:ctx="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:osgi="http://www.springframework.org/schema/osgi"
+ xmlns:osgix="http://www.springframework.org/schema/osgi-compendium"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/context
+ http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/osgi
- http://www.springframework.org/schema/osgi/spring-osgi.xsd">
+ http://www.springframework.org/schema/osgi/spring-osgi.xsd
+ http://www.springframework.org/schema/osgi-compendium
+ http://www.springframework.org/schema/osgi-compendium/spring-osgi-compendium.xsd">
<!-- JBI Deployer -->
<bean id="deployer" class="org.apache.servicemix.jbi.deployer.impl.Deployer">
@@ -37,6 +43,7 @@
<property name="namingStrategy" ref="namingStrategy" />
<property name="managementAgent" ref="managementAgent" />
<property name="preferencesService" ref="preferencesService" />
+ <property name="shutdownTimeout" value="${shutdownTimeout}" />
</bean>
<!-- Endpoint listener -->
@@ -118,5 +125,11 @@
<!-- Preferences Service -->
<osgi:reference id="preferencesService"
interface="org.osgi.service.prefs.PreferencesService" />
+
+ <osgix:cm-properties id="cmProps" persistent-id="org.apache.servicemix.jbi">
+ <prop key="shutdownTimeout">0</prop>
+ </osgix:cm-properties>
+
+ <ctx:property-placeholder properties-ref="cmProps" />
</beans>
Added: servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java?rev=752231&view=auto
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java (added)
+++ servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java Tue Mar 10 19:51:08 2009
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.deployer.artifacts;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.util.Collections;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.jbi.deployer.artifacts.AbstractLifecycleJbiArtifact.State;
+import org.apache.servicemix.jbi.deployer.descriptor.DescriptorFactory;
+import org.apache.servicemix.jbi.deployer.descriptor.ServiceAssemblyDesc;
+import org.apache.servicemix.jbi.runtime.impl.DeliveryChannelImpl;
+import org.apache.servicemix.jbi.runtime.impl.EndpointImpl;
+import org.apache.servicemix.nmr.api.Endpoint;
+import org.apache.servicemix.nmr.api.Pattern;
+import org.apache.servicemix.nmr.api.Status;
+import org.apache.servicemix.nmr.api.internal.InternalEndpoint;
+import org.apache.servicemix.nmr.api.internal.InternalExchange;
+import org.apache.servicemix.nmr.api.service.ServiceHelper;
+import org.apache.servicemix.nmr.core.ExchangeImpl;
+import org.apache.servicemix.nmr.core.InternalEndpointWrapper;
+import org.osgi.service.prefs.Preferences;
+
+/**
+ * Test cases for {@link AssemblyReferencesListener}
+ */
+public class AssemblyReferencesListenerTest extends TestCase {
+
+ public void testKeepTrackOfSyncExchangesWhenDone() throws Exception {
+ ServiceAssemblyImpl sa = createServiceAssembly();
+
+ AssemblyReferencesListener listener = new AssemblyReferencesListener();
+ listener.setAssembly(sa);
+
+ InternalEndpoint endpoint = new InternalEndpointWrapper(new EndpointImpl(ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "endpoint")),
+ ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "internal-endpoint"));
+ listener.endpointRegistered(endpoint);
+ listener.setAssembly(null);
+
+ InternalExchange exchange = new ExchangeImpl(Pattern.InOnly);
+ exchange.setSource(endpoint);
+ exchange.setProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.TRUE);
+ listener.exchangeSent(exchange);
+
+ assertEquals(1, listener.getPending(sa).size());
+
+ exchange.setStatus(Status.Done);
+ listener.exchangeDelivered(exchange);
+
+ assertEquals(0, listener.getPending(sa).size());
+ }
+
+ public void testKeepTrackOfSyncExchangesWhenFailed() throws Exception {
+ ServiceAssemblyImpl sa = createServiceAssembly();
+
+ AssemblyReferencesListener listener = new AssemblyReferencesListener();
+ listener.setAssembly(sa);
+
+ InternalEndpoint endpoint = new InternalEndpointWrapper(new EndpointImpl(ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "endpoint")),
+ ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "internal-endpoint"));
+ listener.endpointRegistered(endpoint);
+ listener.setAssembly(null);
+
+ InternalExchange exchange = new ExchangeImpl(Pattern.InOnly);
+ exchange.setSource(endpoint);
+ exchange.setProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.TRUE);
+ listener.exchangeSent(exchange);
+
+ assertEquals(1, listener.getPending(sa).size());
+
+ exchange.setStatus(Status.Error);
+ listener.exchangeFailed(exchange);
+
+ assertEquals(0, listener.getPending(sa).size());
+ }
+
+ public void testCancelPendingExchanges() throws Exception {
+ ServiceAssemblyImpl sa = createServiceAssembly();
+
+ AssemblyReferencesListener listener = new AssemblyReferencesListener();
+ listener.setAssembly(sa);
+
+ InternalEndpoint endpoint = new InternalEndpointWrapper(new EndpointImpl(ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "endpoint")),
+ ServiceHelper.createMap(Endpoint.ENDPOINT_NAME, "internal-endpoint"));
+ listener.endpointRegistered(endpoint);
+ listener.setAssembly(null);
+
+ InternalExchange exchange = new ExchangeImpl(Pattern.InOnly);
+ exchange.setSource(endpoint);
+ exchange.setProperty(DeliveryChannelImpl.SEND_SYNC, Boolean.TRUE);
+ exchange.getConsumerLock(true);
+ listener.exchangeSent(exchange);
+
+ assertEquals(1, listener.getPending(sa).size());
+
+ listener.cancelPendingSyncExchanges(sa);
+ assertEquals(Status.Error, exchange.getStatus());
+ }
+
+ private ServiceAssemblyImpl createServiceAssembly() {
+ ServiceAssemblyDesc descriptor = DescriptorFactory.buildDescriptor(DescriptorFactory.class.getResource("serviceAssembly.xml")).getServiceAssembly();
+ final Preferences prefs = createMock(Preferences.class);
+ expect(prefs.get("state", State.Shutdown.name())).andReturn(State.Shutdown.name()).anyTimes();
+ replay(prefs);
+
+ ComponentImpl comp = new ComponentImpl(null, null, null, prefs, false, null);
+ ServiceUnitImpl su = new ServiceUnitImpl(descriptor.getServiceUnits()[0], null, comp);
+ ServiceAssemblyImpl sa = new ServiceAssemblyImpl(null, descriptor, Collections.singletonList(su), prefs, new AssemblyReferencesListener(), false);
+ return sa;
+ }
+}
Propchange: servicemix/smx4/nmr/trunk/jbi/deployer/src/test/java/org/apache/servicemix/jbi/deployer/artifacts/AssemblyReferencesListenerTest.java
------------------------------------------------------------------------------
svn:eol-style = native