You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mi...@apache.org on 2009/03/13 20:18:09 UTC

svn commit: r753354 - in /ode/branches/APACHE_ODE_1.X: axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java utils/src/main/java/org/apache/ode/utils/WatchDog.java

Author: midon
Date: Fri Mar 13 19:18:04 2009
New Revision: 753354

URL: http://svn.apache.org/viewvc?rev=753354&view=rev
Log:
ODE-549: lock for watchdog

Modified:
    ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java
    ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
    ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/WatchDog.java

Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java?rev=753354&r1=753353&r2=753354&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/SoapExternalService.java Fri Mar 13 19:18:04 2009
@@ -73,6 +73,8 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Acts as a service not provided by ODE. Used mainly for invocation as a way to maintain the WSDL description of used
@@ -167,7 +169,7 @@
                     axisEPR.setAddress(address);
                 }
             }else{
-                if (__log.isDebugEnabled()) __log.debug("Endpoint URL overridden by process. "+endpointUrl+" => "+mexEndpointUrl);                
+                if (__log.isDebugEnabled()) __log.debug("Endpoint URL overridden by process. "+endpointUrl+" => "+mexEndpointUrl);
             }
 
             if (__log.isDebugEnabled()) {
@@ -265,9 +267,9 @@
             serviceClient = new ServiceClient(_configContext, null);
             _cachedClients.set(serviceClient);
         }
-        AxisService anonymousService = _axisServiceWatchDog.getObserver().anonymousService;
+        AxisService anonymousService = _axisServiceWatchDog.getObserver().get();
         serviceClient.setAxisService(anonymousService);
-        serviceClient.setOptions(_axisOptionsWatchDog.getObserver().options);
+        serviceClient.setOptions(_axisOptionsWatchDog.getObserver().get());
 
         applySecuritySettings(serviceClient);
 
@@ -328,8 +330,8 @@
         WSAEndpoint targetWSAEPR = EndpointFactory.convertToWSA((MutableEndpoint) odeMex.getEndpointReference());
         WSAEndpoint myRoleWSAEPR = EndpointFactory.convertToWSA((MutableEndpoint) odeMex.getMyRoleEndpointReference());
         WSAEndpoint targetEPR = new WSAEndpoint(targetWSAEPR);
-        
-        EndpointReference replyEPR = null; 
+
+        EndpointReference replyEPR = null;
 
         String partnerSessionId = odeMex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
         String myRoleSessionId = odeMex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
@@ -356,7 +358,7 @@
             // Map My Session ID to JMS Correlation ID
             Document callbackEprXml = odeMex.getMyRoleEndpointReference().toXML();
             Element serviceElement = callbackEprXml.getDocumentElement();
-            
+
             if (myRoleSessionId != null) {
                 options.setProperty(JMSConstants.JMS_COORELATION_ID, myRoleSessionId);
             } else {
@@ -365,7 +367,7 @@
                 }
             }
 
-            Element address = DOMUtils.findChildByName(serviceElement, 
+            Element address = DOMUtils.findChildByName(serviceElement,
                     new QName(Namespaces.WS_ADDRESSING_NS, "Address"), true);
             if (__log.isDebugEnabled()) {
                 __log.debug("The system-defined wsa address is : "
@@ -533,31 +535,26 @@
      * The {@link org.apache.axis2.client.ServiceClient} instance is created from the main Axis2 config instance and
      * this service-specific config file.
      */
-    private class ServiceFileObserver extends WatchDog.DefaultObserver {
-        String serviceName = "anonymous_service_" + new GUID().toString();
-        AxisService anonymousService;
+    private class ServiceFileObserver extends WatchDog.DefaultObserver<AxisService> {
+        String serviceName = "axis_service_for_" + _serviceName + "#" + _portName + "_" + new GUID().toString();
         File file;
 
         private ServiceFileObserver(File file) {
             this.file = file;
         }
 
-        public boolean isInitialized() {
-            return anonymousService != null;
-        }
-
         public void init() {
-            // create an anonymous axis service that will be used by the ServiceClient
+// create an anonymous axis service that will be used by the ServiceClient
             // this service will be added to the AxisConfig so do not reuse the name of the external service
             // as it could blow up if the service is deployed in the same axis2 instance
-            anonymousService = new AxisService(serviceName);
-            anonymousService.setParent(_axisConfig);
+            object = new AxisService(serviceName);
+            object.setParent(_axisConfig);
 
             OutOnlyAxisOperation outOnlyOperation = new OutOnlyAxisOperation(ServiceClient.ANON_OUT_ONLY_OP);
-            anonymousService.addOperation(outOnlyOperation);
+            object.addOperation(outOnlyOperation);
 
             OutInAxisOperation outInOperation = new OutInAxisOperation(ServiceClient.ANON_OUT_IN_OP);
-            anonymousService.addOperation(outInOperation);
+            object.addOperation(outInOperation);
 
             // set a right default action *after* operations have been added to the service.
             outOnlyOperation.setSoapAction("");
@@ -570,9 +567,9 @@
             // and load the new config.
             init(); // create a new ServiceClient instance
             try {
-                AxisUtils.configureService(_configContext, anonymousService, file.toURI().toURL());
+                AxisUtils.configureService(_configContext, object, file.toURI().toURL());
                 // do not allow the service.xml file to change the service name
-                anonymousService.setName(serviceName);
+                object.setName(serviceName);
             } catch (Exception e) {
                 if (__log.isWarnEnabled()) __log.warn("Exception while configuring service: " + _serviceName, e);
                 throw new RuntimeException("Exception while configuring service: " + _serviceName, e);
@@ -580,23 +577,17 @@
         }
     }
 
-    private class OptionsObserver extends WatchDog.DefaultObserver {
-
-        Options options;
-
-        public boolean isInitialized() {
-            return options != null;
-        }
+    private class OptionsObserver extends WatchDog.DefaultObserver<Options> {
 
         public void init() {
-            options = new Options();
+            object = new Options();
             // set defaults values
-            options.setExceptionToBeThrownOnSOAPFault(false);
+            object.setExceptionToBeThrownOnSOAPFault(false);
 
             // this value does NOT override Properties.PROP_HTTP_CONNECTION_TIMEOUT
             // nor Properties.PROP_HTTP_SOCKET_TIMEOUT.
             // it will be applied only if the laters are not set.
-            options.setTimeOutInMilliSeconds(60000);
+            object.setTimeOutInMilliSeconds(60000);
         }
 
         public void onUpdate() {
@@ -604,7 +595,7 @@
 
             // note: don't make this map an instance attribute, so we always get the latest version
             final Map<String, String> properties = _pconf.getEndpointProperties(endpointReference);
-            Properties.Axis2.translate(properties, options);
+            Properties.Axis2.translate(properties, object);
         }
     }
 
@@ -625,7 +616,7 @@
         }
 
         public String toString() {
-            return "Properties for Endpoint: " + endpointReference;
+            return "Properties for Endpoint: " + _serviceName + "#" + _portName;
         }
     }
 

Modified: ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java?rev=753354&r1=753353&r2=753354&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java Fri Mar 13 19:18:04 2009
@@ -93,11 +93,8 @@
     // cache the inMemory flag because XMLBeans objects are heavily synchronized (guarded by a coarse-grained lock)
     private volatile boolean _inMemory = false;
 
-    // provide the IL properties
-    private HierarchicalProperties ilProperties;
     // monitor the IL property file and reload it if necessary
     private WatchDog<Map<File, Long>, PropertiesObserver> propertiesWatchDog;
-    private final ReadWriteLock ilPropertiesLock = new ReentrantReadWriteLock();
 
     private EndpointReferenceContext eprContext;
 
@@ -425,17 +422,7 @@
         // update properties if necessary
         // do it manually to save resources (instead of using a thread)
         propertiesWatchDog.check();
-        if (ilProperties == null) {
-            return Collections.EMPTY_MAP;
-        } else {
-            // take a lock so we can have a consistent snapshot of the properties
-            ilPropertiesLock.readLock().lock();
-            try {
-                return ilProperties.getProperties(service, port);
-            } finally {
-                ilPropertiesLock.readLock().unlock();
-            }
-        }
+        return propertiesWatchDog.getObserver().get().getProperties(service, port);
     }
 
     private class PropertiesMutable implements WatchDog.Mutable<Map<File, Long>> {
@@ -456,43 +443,26 @@
         }
     }
 
-    private class PropertiesObserver implements WatchDog.Observer {
+    private class PropertiesObserver extends WatchDog.DefaultObserver<HierarchicalProperties> {
 
         public void init() {
-            ilPropertiesLock.writeLock().lock();
             try {
-                try {
-                    // do not hold a reference on the file list, so that changes are handled
-                    // and always create a new instance of the HierarchicalProperties
-                    ilProperties = new HierarchicalProperties(collectEndpointConfigFiles());
-                } catch (IOException e) {
-                    throw new ContextException("Integration-Layer Properties cannot be loaded!", e);
-                }
-            } finally {
-                ilPropertiesLock.writeLock().unlock();
+                // do not hold a reference on the file list, so that changes are handled
+                // and always create a new instance of the HierarchicalProperties
+                object = new HierarchicalProperties(collectEndpointConfigFiles());
+            } catch (IOException e) {
+                throw new ContextException("Integration-Layer Properties cannot be loaded!", e);
             }
         }
 
-        public boolean isInitialized() {
-            return ilProperties != null;
-        }
-
         public void onUpdate() {
-            ilPropertiesLock.writeLock().lock();
+            init();
             try {
-                init();
-                try {
-                    ilProperties.loadFiles();
-                } catch (IOException e) {
-                    throw new ContextException("Integration-Layer Properties cannot be loaded!", e);
-                }
-            } finally {
-                ilPropertiesLock.writeLock().unlock();
+                object.loadFiles();
+            } catch (IOException e) {
+                throw new ContextException("Integration-Layer Properties cannot be loaded!", e);
             }
-        }
 
-        public void onDelete() {
-            init();
         }
     }
 

Modified: ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/WatchDog.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/WatchDog.java?rev=753354&r1=753353&r2=753354&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/WatchDog.java (original)
+++ ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/WatchDog.java Fri Mar 13 19:18:04 2009
@@ -27,6 +27,9 @@
 import java.util.HashMap;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * This class is based on {@link org.apache.log4j.helpers.FileWatchdog}.<p/>
@@ -112,36 +115,47 @@
     public final void check() {
         long now = System.currentTimeMillis();
         if (expire <= now) {
+            /* get a lock on the observer right now
+             It would be overkilled to lock before testing the dates.
+             By locking after the comparaison the worst scenario is that 2 threads get in the "if",
+             thread A gets the lock, thread B waits for it. Once the lock is released, thread B acquires it and do the checks on the mutable.
+             So this scenario is *harmless*.
+             */
+            observer.getLock().lock();
             expire = now + delay;
-            if (log.isDebugEnabled()) log.debug("["+mutable+"]"+ " check for changes");
-            if (mutable.exists()) {
-                existedBefore = true;
-                if (lastModif == null || mutable.hasChangedSince(lastModif)) {
-                    lastModif = mutable.lastModified();
-                    observer.onUpdate();
-                    if (log.isInfoEnabled()) log.info("["+mutable+"]"+" updated");
-                    warnedAlready = false;
-                }else{
-                    if (log.isDebugEnabled()) log.debug("["+mutable+"]"+" has not changed");
-                }
-            } else if (!observer.isInitialized()) {
-                // no resource and first time
-                observer.init();
-                if (log.isInfoEnabled()) log.info("["+mutable+"]"+ " initialized");
-            } else {
-                if (existedBefore) {
-                    existedBefore = false;
-                    lastModif = null;
-                    observer.onDelete();
-                    if (log.isInfoEnabled()) log.info("["+mutable+"]"+ " deleted");
-                }
-                if (!warnedAlready) {
-                    warnedAlready = true;
-                    if (log.isInfoEnabled()) log.info("["+mutable+"]"+" does not exist.");
+            try {
+                if (log.isDebugEnabled()) log.debug("[" + mutable + "]" + " check for changes");
+                if (mutable.exists()) {
+                    existedBefore = true;
+                    if (lastModif == null || mutable.hasChangedSince(lastModif)) {
+                        lastModif = mutable.lastModified();
+                        observer.onUpdate();
+                        if (log.isInfoEnabled()) log.info("[" + mutable + "]" + " updated");
+                        warnedAlready = false;
+                    } else {
+                        if (log.isDebugEnabled()) log.debug("[" + mutable + "]" + " has not changed");
+                    }
+                } else if (!observer.isInitialized()) {
+                    // no resource and first time
+                    observer.init();
+                    if (log.isInfoEnabled()) log.info("[" + mutable + "]" + " initialized");
+                } else {
+                    if (existedBefore) {
+                        existedBefore = false;
+                        lastModif = null;
+                        observer.onDelete();
+                        if (log.isInfoEnabled()) log.info("[" + mutable + "]" + " deleted");
+                    }
+                    if (!warnedAlready) {
+                        warnedAlready = true;
+                        if (log.isInfoEnabled()) log.info("[" + mutable + "]" + " does not exist.");
+                    }
                 }
+            } finally {
+                observer.getLock().unlock();
             }
-        }else{
-            if (log.isTraceEnabled()) log.trace("["+mutable+"]"+" wait period is not over");
+        } else {
+            if (log.isTraceEnabled()) log.trace("[" + mutable + "]" + " wait period is not over");
         }
     }
 
@@ -218,11 +232,10 @@
         }
     }
 
-    public interface Observer {
+    public interface Observer<A> {
 
         boolean isInitialized();
 
-
         /**
          * Called by {@link WatchDog#check()} if the underlying object is not {@link #isInitialized initialized} and the {@link WatchDog.Mutable#exists()}  resource does not exist}.
          * <br/> This method might called to reset the underlying object.
@@ -247,19 +260,26 @@
          */
         void onUpdate();
 
+        Lock getLock();
+
+        A get();
+
     }
 
     /**
      * A default implementation of #ChangeHandler. Delete and Update will both invoke the #init method which satifies most use cases.
      * So subclasses may simply override the #init method to fit their own needs.
      */
-    public static class DefaultObserver implements Observer {
+    public static class DefaultObserver<A> implements Observer<A> {
+
+        protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+        protected A object;
 
         /**
-         * @return true
+         * @return true if the wrapped if not null
          */
         public boolean isInitialized() {
-            return true;
+            return object != null;
         }
 
         /**
@@ -282,5 +302,17 @@
             init();
         }
 
+        public Lock getLock() {
+            return lock.writeLock();
+        }
+
+        public A get() {
+            lock.readLock().lock();
+            try {
+                return object;
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
     }
 }