You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2016/06/22 10:07:12 UTC
[1/2] aries-rsa git commit: [ARIES-1577] Deadlock in
TopologyManagerImport
Repository: aries-rsa
Updated Branches:
refs/heads/master 7c73a3710 -> 08463cb24
[ARIES-1577] Deadlock in TopologyManagerImport
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/7627ef98
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/7627ef98
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/7627ef98
Branch: refs/heads/master
Commit: 7627ef9867110cd072ec0a67d1b2642b205156c4
Parents: 7c73a37
Author: Johannes Utzig <j....@seeburger.de>
Authored: Tue Jun 21 11:44:27 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Wed Jun 22 09:28:29 2016 +0200
----------------------------------------------------------------------
.../importer/TopologyManagerImport.java | 187 ++++++++++---------
1 file changed, 98 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/7627ef98/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
----------------------------------------------------------------------
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index e548288..3b98710 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -19,14 +19,14 @@
package org.apache.aries.rsa.topologymanager.importer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -125,12 +125,10 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
if (importInterestsCounter.remove(filter) == 0) {
LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
endpointListenerManager.reduceScope(filter);
- synchronized (importedServices) {
- List<ImportRegistration> irs = importedServices.remove(filter);
- if (irs != null) {
- for (ImportRegistration ir : irs) {
- ir.close();
- }
+ List<ImportRegistration> irs = remove(filter, importedServices);
+ if (irs != null) {
+ for (ImportRegistration ir : irs) {
+ ir.close();
}
}
}
@@ -153,39 +151,24 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
private void addImportPossibility(EndpointDescription endpoint, String filter) {
- synchronized (importPossibilities) {
- List<EndpointDescription> endpoints = importPossibilities.get(filter);
- if (endpoints == null) {
- endpoints = new ArrayList<EndpointDescription>();
- importPossibilities.put(filter, endpoints);
- }
- // prevent adding the same endpoint multiple times, which can happen sometimes,
- // and which causes imports to remain available even when services are actually down
- if (!endpoints.contains(endpoint)) {
- endpoints.add(endpoint);
- }
- }
+ put(filter, importPossibilities, endpoint);
}
private void removeImportPossibility(EndpointDescription endpoint, String filter) {
- synchronized (importPossibilities) {
- List<EndpointDescription> endpoints = importPossibilities.get(filter);
- if (endpoints != null) {
- endpoints.remove(endpoint);
- if (endpoints.isEmpty()) {
- importPossibilities.remove(filter);
- }
- }
+ List<EndpointDescription> endpoints = get(filter, importPossibilities);
+ remove(filter, importPossibilities, endpoint);
+ if (endpoints.isEmpty()) {
+ remove(filter,importPossibilities,null);
}
}
public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
- synchronized (importPossibilities) {
- for (String filter : importPossibilities.keySet()) {
- triggerImport(filter);
- }
+
+ for (String filter : keySet(importPossibilities)) {
+ triggerImport(filter);
}
+
}
public void remove(RemoteServiceAdmin rsa) {
@@ -210,56 +193,34 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
private void unexportNotAvailableServices(String filter) {
- synchronized (importedServices) {
- List<ImportRegistration> importRegistrations = importedServices.get(filter);
- if (importRegistrations != null) {
- // iterate over a copy
- for (ImportRegistration ir : new ArrayList<ImportRegistration>(importRegistrations)) {
- EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
- if (!isImportPossibilityAvailable(endpoint, filter)) {
- removeImport(ir, null); // also unexports the service
- }
- }
+ List<ImportRegistration> importRegistrations = get(filter, importedServices);
+ for (ImportRegistration ir : importRegistrations) {
+ EndpointDescription endpoint = ir.getImportReference().getImportedEndpoint();
+ if (!isImportPossibilityAvailable(endpoint, filter)) {
+ removeImport(ir, null); // also unexports the service
}
}
}
private boolean isImportPossibilityAvailable(EndpointDescription endpoint, String filter) {
- synchronized (importPossibilities) {
- List<EndpointDescription> endpoints = importPossibilities.get(filter);
- return endpoints != null && endpoints.contains(endpoint);
- }
- }
+ List<EndpointDescription> endpoints = get(filter, importPossibilities);
+ return endpoints != null && endpoints.contains(endpoint);
- // return a copy to prevent sync issues
- private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
- synchronized (importPossibilities) {
- List<EndpointDescription> possibilities = importPossibilities.get(filter);
- return possibilities == null
- ? Collections.<EndpointDescription>emptyList()
- : new ArrayList<EndpointDescription>(possibilities);
- }
}
private void importServices(String filter) {
- synchronized (importedServices) {
- List<ImportRegistration> importRegistrations = importedServices.get(filter);
- for (EndpointDescription endpoint : getImportPossibilitiesCopy(filter)) {
- // TODO but optional: if the service is already imported and the endpoint is still
- // in the list of possible imports check if a "better" endpoint is now in the list
- if (!alreadyImported(endpoint, importRegistrations)) {
- // service not imported yet -> import it now
- ImportRegistration ir = importService(endpoint);
- if (ir != null) {
- // import was successful
- if (importRegistrations == null) {
- importRegistrations = new ArrayList<ImportRegistration>();
- importedServices.put(filter, importRegistrations);
- }
- importRegistrations.add(ir);
- if (!importAllAvailable) {
- return;
- }
+ List<ImportRegistration> importRegistrations = get(filter, importedServices);
+ for (EndpointDescription endpoint : get(filter, importPossibilities)) {
+ // TODO but optional: if the service is already imported and the endpoint is still
+ // in the list of possible imports check if a "better" endpoint is now in the list
+ if (!alreadyImported(endpoint, importRegistrations)) {
+ // service not imported yet -> import it now
+ ImportRegistration ir = importService(endpoint);
+ if (ir != null) {
+ // import was successful
+ put(filter, importedServices, ir);
+ if (!importAllAvailable) {
+ return;
}
}
}
@@ -315,25 +276,19 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
// and receiving a RemoteServiceAdminEvent for its unregistration, which results
// in a ConcurrentModificationException. We avoid this by closing the registrations
// only after data structure manipulation is done, and being re-entrant.
- synchronized (importedServices) {
- List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
- for (Iterator<List<ImportRegistration>> it1 = importedServices.values().iterator(); it1.hasNext();) {
- Collection<ImportRegistration> irs = it1.next();
- for (Iterator<ImportRegistration> it2 = irs.iterator(); it2.hasNext();) {
- ImportRegistration ir = it2.next();
- if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
- removed.add(ir);
- it2.remove();
- }
+ List<ImportRegistration> removed = new ArrayList<ImportRegistration>();
+ Set<Entry<String, List<ImportRegistration>>> entries = entrySet(importedServices);
+ for (Entry<String, List<ImportRegistration>> entry : entries) {
+ for (ImportRegistration ir : entry.getValue()) {
+ if (ir.equals(reg) || ir.getImportReference().equals(ref)) {
+ removed.add(ir);
+ remove(entry.getKey(), importedServices, ir);
}
- if (irs.isEmpty()) {
- it1.remove();
- }
- }
- for (ImportRegistration ir : removed) {
- ir.close();
}
}
+ for (ImportRegistration ir : removed) {
+ ir.close();
+ }
}
public void remoteAdminEvent(RemoteServiceAdminEvent event) {
@@ -342,4 +297,58 @@ public class TopologyManagerImport implements EndpointListener, RemoteServiceAdm
}
}
+ private <T> void put(String key, Map<String, List<T>> map, T value) {
+ synchronized (map) {
+ List<T> list = map.get(key);
+ if(list == null) {
+ list = new CopyOnWriteArrayList<T>();
+ map.put(key, list);
+ }
+ //make sure there is no duplicates
+ if(!list.contains(value)) {
+ list.add(value);
+ }
+ }
+ }
+
+ private <T> List<T> get(String key, Map<String, List<T>> map) {
+ synchronized (map) {
+ List<T> list = map.get(key);
+ if(list == null)
+ return Collections.emptyList();
+ return list;
+ }
+ }
+
+ private <T> List<T> remove(String key, Map<String, List<T>> map) {
+ synchronized (map) {
+ return map.remove(key);
+ }
+ }
+
+ private <T> void remove(String key, Map<String, List<T>> map, T value) {
+ synchronized (map) {
+ List<T> list = map.get(key);
+ if (list != null) {
+ list.remove(value);
+ if(list.isEmpty()) {
+ map.remove(key);
+ }
+ }
+ }
+ }
+
+ private <T> Set<Entry<String, List<T>>> entrySet(Map<String, List<T>> map) {
+ synchronized (map) {
+ Set<Entry<String, List<T>>> entries = map.entrySet();
+ return new HashSet<Entry<String, List<T>>>(entries);
+ }
+ }
+
+ private <T> Set<String> keySet(Map<String, List<T>> map) {
+ synchronized (map) {
+ Set<String> keySet = map.keySet();
+ return new HashSet<String>(keySet);
+ }
+ }
}
[2/2] aries-rsa git commit: [ARIES-1579] Use cleaner way to start
second container
Posted by cs...@apache.org.
[ARIES-1579] Use cleaner way to start second container
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/08463cb2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/08463cb2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/08463cb2
Branch: refs/heads/master
Commit: 08463cb2413433c5c17db5fd6daeb3110980d32a
Parents: 7627ef9
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Wed Jun 22 12:07:05 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Wed Jun 22 12:07:05 2016 +0200
----------------------------------------------------------------------
itests/felix/pom.xml | 2 +-
.../rsa/itests/felix/ServerConfiguration.java | 12 ++++
.../rsa/itests/felix/TwoContainerPaxExam.java | 63 ++++++++++++++++++++
.../felix/fastbin/TestFastbinRoundTrip.java | 24 ++------
.../rsa/itests/felix/tcp/TestRoundTrip.java | 23 ++-----
5 files changed, 86 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/pom.xml
----------------------------------------------------------------------
diff --git a/itests/felix/pom.xml b/itests/felix/pom.xml
index 4728c9e..1fead7a 100644
--- a/itests/felix/pom.xml
+++ b/itests/felix/pom.xml
@@ -124,7 +124,7 @@
</dependency>
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
- <artifactId>pax-exam-container-native</artifactId>
+ <artifactId>pax-exam-container-forked</artifactId>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java
new file mode 100644
index 0000000..ab4e8f5
--- /dev/null
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/ServerConfiguration.java
@@ -0,0 +1,12 @@
+package org.apache.aries.rsa.itests.felix;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface ServerConfiguration {
+
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java
new file mode 100644
index 0000000..555a5ac
--- /dev/null
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/TwoContainerPaxExam.java
@@ -0,0 +1,63 @@
+package org.apache.aries.rsa.itests.felix;
+
+import java.lang.reflect.Method;
+
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.model.InitializationError;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+
+public class TwoContainerPaxExam extends PaxExam {
+
+ private Class<?> testClass;
+
+ public TwoContainerPaxExam(Class<?> klass) throws InitializationError {
+ super(klass);
+ this.testClass = klass;
+ }
+
+ @Override
+ public void run(RunNotifier notifier) {
+ TestContainer remoteContainer = null;
+ try {
+
+ ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig());
+ remoteContainer = PaxExamRuntime.createContainer(testSystem);
+ remoteContainer.start();
+ super.run(notifier);
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (remoteContainer != null) {
+ remoteContainer.stop();
+ }
+ }
+
+ }
+
+ private Option[] remoteConfig() throws Exception {
+ Object testO = this.testClass.newInstance();
+ Method configMethod = getServerConfigMethod();
+ return (Option[])configMethod.invoke(testO);
+ }
+
+ private Method getServerConfigMethod() throws NoSuchMethodException, SecurityException {
+ Method[] methods = testClass.getMethods();
+ for (Method method : methods) {
+ if (method.getAnnotation(ServerConfiguration.class) != null) {
+ if (method.getParameterTypes().length > 0) {
+ throw new IllegalArgumentException("ServerConfiguration method must have no params");
+ }
+ if (method.getReturnType() != Option[].class) {
+ throw new IllegalArgumentException("ServerConfiguration method must return Option[]");
+ }
+ return method;
+ }
+ }
+ throw new IllegalArgumentException("One method must be annotated with @ServerConfiguration");
+ }
+}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
index a86d303..c73d822 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/fastbin/TestFastbinRoundTrip.java
@@ -26,29 +26,20 @@ import javax.inject.Inject;
import org.apache.aries.rsa.examples.echotcp.api.EchoService;
import org.apache.aries.rsa.itests.felix.RsaTestBase;
+import org.apache.aries.rsa.itests.felix.ServerConfiguration;
+import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.ExamSystem;
import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.TestContainer;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.ops4j.pax.exam.spi.PaxExamRuntime;
-@RunWith(PaxExam.class)
+@RunWith(TwoContainerPaxExam.class)
public class TestFastbinRoundTrip extends RsaTestBase {
- private static TestContainer remoteContainer;
-
@Inject
EchoService echoService;
- public static void startRemote() throws IOException, InterruptedException {
- ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig());
- remoteContainer = PaxExamRuntime.createContainer(testSystem);
- remoteContainer.start();
- }
-
- private static Option[] remoteConfig() throws IOException {
+ @ServerConfiguration
+ public static Option[] remoteConfig() throws IOException {
return new Option[] {
rsaCoreZookeeper(),
rsaFastBin(),
@@ -61,7 +52,6 @@ public class TestFastbinRoundTrip extends RsaTestBase {
@Configuration
public static Option[] configure() throws Exception {
- startRemote();
return new Option[] {
rsaCoreZookeeper(),
rsaFastBin(),
@@ -76,8 +66,4 @@ public class TestFastbinRoundTrip extends RsaTestBase {
assertEquals("test", echoService.echo("test"));
}
- public static void shutdownRemote() {
- remoteContainer.stop();
- }
-
}
http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/08463cb2/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
----------------------------------------------------------------------
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
index 0f80df2..b3ea522 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
@@ -27,30 +27,21 @@ import javax.inject.Inject;
import org.apache.aries.rsa.examples.echotcp.api.EchoService;
import org.apache.aries.rsa.itests.felix.RsaTestBase;
+import org.apache.aries.rsa.itests.felix.ServerConfiguration;
+import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
-import org.ops4j.pax.exam.ExamSystem;
import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.TestContainer;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.ops4j.pax.exam.spi.PaxExamRuntime;
-@RunWith(PaxExam.class)
+@RunWith(TwoContainerPaxExam.class)
public class TestRoundTrip extends RsaTestBase {
- private static TestContainer remoteContainer;
-
@Inject
EchoService echoService;
- public static void startRemote() throws IOException, InterruptedException {
- ExamSystem testSystem = PaxExamRuntime.createTestSystem(remoteConfig());
- remoteContainer = PaxExamRuntime.createContainer(testSystem);
- remoteContainer.start();
- }
-
- private static Option[] remoteConfig() throws IOException {
+ @ServerConfiguration
+ public static Option[] remoteConfig() throws IOException {
return new Option[] {
rsaCoreZookeeper(),
rsaTcp(),
@@ -62,7 +53,6 @@ public class TestRoundTrip extends RsaTestBase {
@Configuration
public static Option[] configure() throws Exception {
- startRemote();
return new Option[] {
rsaCoreZookeeper(),
rsaTcp(),
@@ -76,7 +66,4 @@ public class TestRoundTrip extends RsaTestBase {
assertEquals("test", echoService.echo("test"));
}
- public static void shutdownRemote() {
- remoteContainer.stop();
- }
}