You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/06/22 10:07:12 UTC

[1/2] aries-rsa git commit: [ARIES-1577] Deadlock in TopologyManagerImport

Repository: aries-rsa
Updated Branches:
  refs/heads/master 7c73a3710 -> 08463cb24


[ARIES-1577] Deadlock in TopologyManagerImport

Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/7627ef98
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/7627ef98
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/7627ef98

Branch: refs/heads/master
Commit: 7627ef9867110cd072ec0a67d1b2642b205156c4
Parents: 7c73a37
Author: Johannes Utzig <j....@seeburger.de>
Authored: Tue Jun 21 11:44:27 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Wed Jun 22 09:28:29 2016 +0200

----------------------------------------------------------------------
 .../importer/TopologyManagerImport.java         | 187 ++++++++++---------
 1 file changed, 98 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/7627ef98/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index e548288..3b98710 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -19,14 +19,14 @@
 package org.apache.aries.rsa.topologymanager.importer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -125,12 +125,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         if (importInterestsCounter.remove(filter) == 0) {
             LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
             endpointListenerManager.reduceScope(filter);
-            synchronized (importedServices) {
-                List<ImportRegistration> irs = importedServices.remove(filter);
-                if (irs != null) {
-                    for (ImportRegistration ir : irs) {
-                        ir.close();
-                    }
+            List<ImportRegistration> irs = remove(filter, importedServices);
+            if (irs != null) {
+                for (ImportRegistration ir : irs) {
+                    ir.close();
                 }
             }
         }
@@ -153,39 +151,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     }
 
     private void addImportPossibility(EndpointDescription endpoint, String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> endpoints = importPossibilities.get(filter);
-            if (endpoints == null) {
-                endpoints = new ArrayList<EndpointDescription>();
-                importPossibilities.put(filter, endpoints);
-            }
-            // prevent adding the same endpoint multiple times, which can happen sometimes,
-            // and which causes imports to remain available even when services are actually down
-            if (!endpoints.contains(endpoint)) {
-                endpoints.add(endpoint);
-            }
-        }
+        put(filter, importPossibilities, endpoint);
     }
 
     private void removeImportPossibility(EndpointDescription endpoint, String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> endpoints = importPossibilities.get(filter);
-            if (endpoints != null) {
-                endpoints.remove(endpoint);
-                if (endpoints.isEmpty()) {
-                    importPossibilities.remove(filter);
-                }
-            }
+        List<EndpointDescription> endpoints = get(filter, importPossibilities);
+        remove(filter, importPossibilities, endpoint);
+        if (endpoints.isEmpty()) {
+            remove(filter,importPossibilities,null);
         }
     }
 
     public void add(RemoteServiceAdmin rsa) {
         rsaSet.add(rsa);
-        synchronized (importPossibilities) {
-            for (String filter : importPossibilities.keySet()) {
-                triggerImport(filter);
-            }
+
+        for (String filter : keySet(importPossibilities)) {
+            triggerImport(filter);
         }
+
     }
     
     public void remove(RemoteServiceAdmin rsa) {
@@ -210,56 +193,34 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
     }
 
     private void unexportNotAvailableServices(String filter) {
-        synchronized (importedServices) {
-            List<ImportRegistration> importRegistrations = importedServices.get(filter);
-            if (importRegistrations != null) {
-                // iterate over a copy
-                for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations)) {
-                    EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
-                    if (!isImportPossibilityAvailable(endpoint, filter)) {
-                        removeImport(ir, null); // also unexports the service
-                    }
-                }
+        List<ImportRegistration> importRegistrations = get(filter, importedServices);
+        for (ImportRegistration ir : importRegistrations) {
+            EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
+            if (!isImportPossibilityAvailable(endpoint, filter)) {
+                removeImport(ir, null); // also unexports the service
             }
         }
     }
 
     private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> endpoints = importPossibilities.get(filter);
-            return endpoints != null && endpoints.contains(endpoint);
-        }
-    }
+        List<EndpointDescription> endpoints = get(filter, importPossibilities);
+        return endpoints != null && endpoints.contains(endpoint);
 
-    // return a copy to prevent sync issues
-    private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
-        synchronized (importPossibilities) {
-            List<EndpointDescription> possibilities = importPossibilities.get(filter);
-            return possibilities == null
-                ? Collections.<EndpointDescription>emptyList()
-                : new ArrayList<EndpointDescription>(possibilities);
-        }
     }
 
     private void importServices(String filter) {
-        synchronized (importedServices) {
-            List<ImportRegistration> importRegistrations = importedServices.get(filter);
-            for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) {
-                // TODO but optional: if the service is already imported and the endpoint is still
-                // in the list of possible imports check if a "better" endpoint is now in the list
-                if (!alreadyImported(endpoint, importRegistrations)) {
-                    // service not imported yet -> import it now
-                    ImportRegistration ir = importService(endpoint);
-                    if (ir != null) {
-                        // import was successful
-                        if (importRegistrations == null) {
-                            importRegistrations = new ArrayList<ImportRegistration>();
-                            importedServices.put(filter, importRegistrations);
-                        }
-                        importRegistrations.add(ir);
-                        if (!importAllAvailable) {
-                            return;
-                        }
+        List<ImportRegistration> importRegistrations = get(filter, importedServices);
+        for (EndpointDescription endpoint : get(filter, importPossibilities)) {
+            // TODO but optional: if the service is already imported and the endpoint is still
+            // in the list of possible imports check if a "better" endpoint is now in the list
+            if (!alreadyImported(endpoint, importRegistrations)) {
+                // service not imported yet -> import it now
+                ImportRegistration ir = importService(endpoint);
+                if (ir != null) {
+                    // import was successful
+                    put(filter, importedServices, ir);
+                    if (!importAllAvailable) {
+                        return;
                     }
                 }
             }
@@ -315,25 +276,19 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         // and receiving a RemoteServiceAdminEvent for its unregistration, which results
         // in a ConcurrentModificationException. We avoid this by closing the registrations
         // only after data structure manipulation is done, and being re-entrant.
-        synchronized (importedServices) {
-            List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
-            for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator(); it1.hasNext();) {
-                Collection<ImportRegistration> irs = it1.next();
-                for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();) {
-                    ImportRegistration ir = it2.next();
-                    if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
-                        removed.add(ir);
-                        it2.remove();
-                    }
+        List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
+        Set<Entry<String, List<ImportRegistration>>> entries = entrySet(importedServices);
+        for (Entry<String, List<ImportRegistration>> entry : entries) {
+            for (ImportRegistration ir : entry.getValue()) {
+                if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
+                    removed.add(ir);
+                    remove(entry.getKey(), importedServices, ir);
                 }
-                if (irs.isEmpty()) {
-                    it1.remove();
-                }
-            }
-            for (ImportRegistration ir : removed) {
-                ir.close();
             }
         }
+        for (ImportRegistration ir : removed) {
+            ir.close();
+        }
     }
 
     public void remoteAdminEvent(RemoteServiceAdminEvent event) {
@@ -342,4 +297,58 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
         }
     }
 
+    private <T> void put(String key, Map<String, List<T>> map, T value) {
+        synchronized (map) {
+            List<T> list = map.get(key);
+            if(list == null) {
+                list = new CopyOnWriteArrayList<T>();
+                map.put(key, list);
+            }
+            //make sure there is no duplicates
+            if(!list.contains(value)) {
+                list.add(value);
+            }
+        }
+    }
+
+    private <T> List<T> get(String key, Map<String, List<T>> map) {
+        synchronized (map) {
+            List<T> list = map.get(key);
+            if(list == null)
+                return Collections.emptyList();
+            return list;
+        }
+    }
+
+    private <T> List<T> remove(String key, Map<String, List<T>> map) {
+        synchronized (map) {
+            return map.remove(key);
+        }
+    }
+
+    private <T> void remove(String key, Map<String, List<T>> map, T value) {
+        synchronized (map) {
+            List<T> list = map.get(key);
+            if (list != null) {
+                list.remove(value);
+                if(list.isEmpty()) {
+                    map.remove(key);
+                }
+            }
+        }
+    }
+
+    private <T> Set<Entry<String, List<T>>> entrySet(Map<String, List<T>> map) {
+        synchronized (map) {
+            Set<Entry<String, List<T>>> entries = map.entrySet();
+            return new HashSet<Entry<String, List<T>>>(entries);
+        }
+    }
+
+    private <T> Set<String> keySet(Map<String, List<T>> map) {
+        synchronized (map) {
+            Set<String> keySet = map.keySet();
+            return new HashSet<String>(keySet);
+        }
+    }
 }


[2/2] aries-rsa git commit: [ARIES-1579] Use cleaner way to start second container

Posted by cs...@apache.org.
[ARIES-1579] Use cleaner way to start second container


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/08463cb2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/08463cb2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/08463cb2

Branch: refs/heads/master
Commit: 08463cb2413433c5c17db5fd6daeb3110980d32a
Parents: 7627ef9
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Wed Jun 22 12:07:05 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Wed Jun 22 12:07:05 2016 +0200

----------------------------------------------------------------------
 itests/felix/pom.xml                            |  2 +-
 .../rsa/itests/felix/ServerConfiguration.java   | 12 ++++
 .../rsa/itests/felix/TwoContainerPaxExam.java   | 63 ++++++++++++++++++++
 .../felix/fastbin/TestFastbinRoundTrip.java     | 24 ++------
 .../rsa/itests/felix/tcp/TestRoundTrip.java     | 23 ++-----
 5 files changed, 86 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/pom.xml
----------------------------------------------------------------------
diff --git a/itests/felix/pom.xml b/itests/felix/pom.xml
index 4728c9e..1fead7a 100644
--- a/itests/felix/pom.xml
+++ b/itests/felix/pom.xml
@@ -124,7 +124,7 @@
         </dependency>
         <dependency>
             <groupId>org.ops4j.pax.exam</groupId>
-            <artifactId>pax-exam-container-native</artifactId>
+            <artifactId>pax-exam-container-forked</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java
new file mode 100644
index 0000000..ab4e8f5
--- /dev/null
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java
@@ -0,0 +1,12 @@
+package org.apache.aries.rsa.itests.felix;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface ServerConfiguration {
+
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java
new file mode 100644
index 0000000..555a5ac
--- /dev/null
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java
@@ -0,0 +1,63 @@
+package org.apache.aries.rsa.itests.felix;
+
+import java.lang.reflect.Method;
+
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.model.InitializationError;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+
+public class TwoContainerPaxExam extends PaxExam {
+
+    private Class<?> testClass;
+
+    public TwoContainerPaxExam(Class<?> klass) throws InitializationError {
+        super(klass);
+        this.testClass = klass;
+    }
+
+    @Override
+    public void run(RunNotifier notifier) {
+        TestContainer remoteContainer = null;
+        try {
+            
+            ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig());
+            remoteContainer = PaxExamRuntime.createContainer(testSystem);
+            remoteContainer.start();
+            super.run(notifier);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (remoteContainer != null) {
+                remoteContainer.stop();
+            }
+        }
+        
+    }
+
+    private Option[] remoteConfig() throws Exception {
+        Object testO = this.testClass.newInstance();
+        Method configMethod = getServerConfigMethod();
+        return (Option[])configMethod.invoke(testO);
+    }
+
+    private Method getServerConfigMethod() throws NoSuchMethodException, SecurityException {
+        Method[] methods = testClass.getMethods();
+        for (Method method : methods) {
+            if (method.getAnnotation(ServerConfiguration.class) != null) {
+                if (method.getParameterTypes().length > 0) {
+                    throw new IllegalArgumentException("ServerConfiguration method must have no params");
+                }
+                if (method.getReturnType() != Option[].class) {
+                    throw new IllegalArgumentException("ServerConfiguration method must return Option[]");
+                }
+                return method;
+            }
+        }
+        throw new IllegalArgumentException("One method must be annotated with @ServerConfiguration");
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
index a86d303..c73d822 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
@@ -26,29 +26,20 @@ import javax.inject.Inject;
 
 import org.apache.aries.rsa.examples.echotcp.api.EchoService;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
+import org.apache.aries.rsa.itests.felix.ServerConfiguration;
+import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.ExamSystem;
 import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.TestContainer;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.ops4j.pax.exam.spi.PaxExamRuntime;
 
-@RunWith(PaxExam.class)
+@RunWith(TwoContainerPaxExam.class)
 public class TestFastbinRoundTrip extends RsaTestBase {
-    private static TestContainer remoteContainer;
-
     @Inject
     EchoService echoService;
 
-    public static void startRemote() throws IOException, InterruptedException {
-        ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig());
-        remoteContainer = PaxExamRuntime.createContainer(testSystem);
-        remoteContainer.start();
-    }
-
-    private static Option[] remoteConfig() throws IOException {
+    @ServerConfiguration
+    public static Option[] remoteConfig() throws IOException {
         return new Option[] {
                              rsaCoreZookeeper(),
                              rsaFastBin(),
@@ -61,7 +52,6 @@ public class TestFastbinRoundTrip extends RsaTestBase {
 
     @Configuration
     public static Option[] configure() throws Exception {
-        startRemote();
         return new Option[] {
                              rsaCoreZookeeper(),
                              rsaFastBin(),
@@ -76,8 +66,4 @@ public class TestFastbinRoundTrip extends RsaTestBase {
         assertEquals("test", echoService.echo("test"));
     }
 
-    public static void shutdownRemote() {
-        remoteContainer.stop();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
index 0f80df2..b3ea522 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
@@ -27,30 +27,21 @@ import javax.inject.Inject;
 
 import org.apache.aries.rsa.examples.echotcp.api.EchoService;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
+import org.apache.aries.rsa.itests.felix.ServerConfiguration;
+import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.ExamSystem;
 import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.TestContainer;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.ops4j.pax.exam.spi.PaxExamRuntime;
 
-@RunWith(PaxExam.class)
+@RunWith(TwoContainerPaxExam.class)
 public class TestRoundTrip extends RsaTestBase {
 
-    private static TestContainer remoteContainer;
-
     @Inject
     EchoService echoService;
 
-    public static void startRemote() throws IOException, InterruptedException {
-        ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig());
-        remoteContainer = PaxExamRuntime.createContainer(testSystem);
-        remoteContainer.start();
-    }
-
-    private static Option[] remoteConfig() throws IOException {
+    @ServerConfiguration
+    public static Option[] remoteConfig() throws IOException {
         return new Option[] {
             rsaCoreZookeeper(),
             rsaTcp(),
@@ -62,7 +53,6 @@ public class TestRoundTrip extends RsaTestBase {
 
     @Configuration
     public static Option[] configure() throws Exception {
-        startRemote();
         return new Option[] {
                 rsaCoreZookeeper(),
                 rsaTcp(),
@@ -76,7 +66,4 @@ public class TestRoundTrip extends RsaTestBase {
         assertEquals("test", echoService.echo("test"));
     }
 
-    public static void shutdownRemote() {
-        remoteContainer.stop();
-    }
 }