You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@felix.apache.org by gn...@apache.org on 2015/07/13 17:19:17 UTC

svn commit: r1690728 - /felix/trunk/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java

Author: gnodet
Date: Mon Jul 13 15:19:16 2015
New Revision: 1690728

URL: http://svn.apache.org/r1690728
Log:
[FELIX-4942] Finally introduce some parallelism in the resolver

Modified:
    felix/trunk/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java

Modified: felix/trunk/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java
URL: http://svn.apache.org/viewvc/felix/trunk/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java?rev=1690728&r1=1690727&r2=1690728&view=diff
==============================================================================
--- felix/trunk/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java (original)
+++ felix/trunk/resolver/src/main/java/org/apache/felix/resolver/ResolverImpl.java Mon Jul 13 15:19:16 2015
@@ -29,6 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.felix.resolver.util.ArrayMap;
 import org.apache.felix.resolver.util.OpenHashMap;
@@ -52,6 +58,8 @@ public class ResolverImpl implements Res
 {
     private final Logger m_logger;
 
+    private final int m_parallelism;
+
     // Note this class is not thread safe.
     // Only use in the context of a single thread.
     class ResolveSession
@@ -70,7 +78,7 @@ public class ResolverImpl implements Res
         // removed the offending capabilities
         private Candidates m_multipleCardCandidates = null;
 
-        private final Map<String, List<String>> m_usesCache = new HashMap<String, List<String>>();
+        private final ConcurrentMap<String, List<String>> m_usesCache = new ConcurrentHashMap<String, List<String>>();
 
         ResolveSession(ResolveContext resolveContext)
         {
@@ -102,18 +110,44 @@ public class ResolverImpl implements Res
             return m_resolveContext;
         }
 
-        public Map<String, List<String>> getUsesCache() {
+        public ConcurrentMap<String, List<String>> getUsesCache() {
             return m_usesCache;
         }
     }
 
     public ResolverImpl(Logger logger)
     {
-        m_logger = logger;
+        this(logger, Runtime.getRuntime().availableProcessors());
+    }
+
+    public ResolverImpl(Logger logger, int parallelism)
+    {
+        this.m_logger = logger;
+        this.m_parallelism = parallelism;
     }
 
     public Map<Resource, List<Wire>> resolve(ResolveContext rc) throws ResolutionException
     {
+        if (m_parallelism > 1)
+        {
+            ExecutorService executor = Executors.newFixedThreadPool(m_parallelism);
+            try
+            {
+                return doResolve(rc, executor);
+            }
+            finally
+            {
+                executor.shutdownNow();
+            }
+        }
+        else
+        {
+            return doResolve(rc, new DumbExecutor());
+        }
+    }
+
+    private Map<Resource, List<Wire>> doResolve(ResolveContext rc, Executor executor) throws ResolutionException
+    {
         ResolveSession session = new ResolveSession(rc);
         Map<Resource, List<Wire>> wireMap =
             new HashMap<Resource, List<Wire>>();
@@ -267,7 +301,7 @@ public class ResolverImpl implements Res
                     }
 
                     Map<Resource, ResolutionError> currentFaultyResources = new HashMap<Resource, ResolutionError>();
-                    rethrow = checkConsistency(session, allCandidates, currentFaultyResources, hosts, false);
+                    rethrow = checkConsistency(executor, session, allCandidates, currentFaultyResources, hosts, false);
 
                     if (!currentFaultyResources.isEmpty())
                     {
@@ -371,6 +405,7 @@ public class ResolverImpl implements Res
     }
 
     private ResolutionError checkConsistency(
+        Executor executor,
         ResolveSession session,
         Candidates allCandidates,
         Map<Resource, ResolutionError> currentFaultyResources,
@@ -379,7 +414,7 @@ public class ResolverImpl implements Res
     {
         // Calculate package spaces
         Map<Resource, Packages> resourcePkgMap =
-            calculatePackageSpaces(session, allCandidates, hosts.values());
+            calculatePackageSpaces(executor, session, allCandidates, hosts.values());
         ResolutionError error = null;
         // Check package consistency
         Map<Resource, Object> resultCache =
@@ -510,6 +545,7 @@ public class ResolverImpl implements Res
                         // execute code, so we don't need to check for
                         // this case like we do for a normal resolve.
                         rethrow = checkConsistency(
+                                new DumbExecutor(),
                                 session, allCandidates,
                                 new OpenHashMap<Resource, ResolutionError>(resourcePkgMap.size()),
                                 Collections.singletonMap(host, allCandidates.getWrappedHost(host)),
@@ -1028,35 +1064,48 @@ public class ResolverImpl implements Res
         }
     }
 
-    private static void getWireCandidatesAndRecurse(
-            ResolveSession session,
-            Candidates allCandidates,
-            Map<Resource, List<WireCandidate>> allWireCandidates,
-            Resource resource
-    )
-    {
-        List<WireCandidate> wireCandidates = getWireCandidates(session, allCandidates, resource);
-        allWireCandidates.put(resource, wireCandidates);
-        for (WireCandidate w : wireCandidates)
-        {
-            Resource r = w.capability.getResource();
-            if (!allWireCandidates.containsKey(r))
-            {
-                getWireCandidatesAndRecurse(session, allCandidates, allWireCandidates, r);
-            }
-        }
-    }
-
     private static Map<Resource, Packages> calculatePackageSpaces(
+            final Executor innerExecutor,
             final ResolveSession session,
             final Candidates allCandidates,
             Collection<Resource> hosts)
     {
+        final EnhancedExecutor executor = new EnhancedExecutor(innerExecutor);
+
         // Parallel compute wire candidates
-        final Map<Resource, List<WireCandidate>> allWireCandidates = new OpenHashMap<Resource, List<WireCandidate>>();
-        for (Resource resource : hosts)
+        final Map<Resource, List<WireCandidate>> allWireCandidates = new ConcurrentHashMap<Resource, List<WireCandidate>>();
         {
-            getWireCandidatesAndRecurse(session, allCandidates, allWireCandidates, resource);
+            final ConcurrentMap<Resource, Runnable> tasks = new ConcurrentHashMap<Resource, Runnable>(allCandidates.getNbResources());
+            class Computer implements Runnable
+            {
+                final Resource resource;
+                public Computer(Resource resource)
+                {
+                    this.resource = resource;
+                }
+                public void run()
+                {
+                    List<WireCandidate> wireCandidates = getWireCandidates(session, allCandidates, resource);
+                    allWireCandidates.put(resource, wireCandidates);
+                    for (WireCandidate w : wireCandidates)
+                    {
+                        Resource u = w.capability.getResource();
+                        if (!tasks.containsKey(u))
+                        {
+                            Computer c = new Computer(u);
+                            if (tasks.putIfAbsent(u, c) == null)
+                            {
+                                executor.execute(c);
+                            }
+                        }
+                    }
+                }
+            }
+            for (Resource resource : hosts)
+            {
+                executor.execute(new Computer(resource));
+            }
+            executor.await();
         }
 
         // Parallel get all exported packages
@@ -1065,14 +1114,28 @@ public class ResolverImpl implements Res
         {
             final Packages packages = new Packages(resource);
             allPackages.put(resource, packages);
-            calculateExportedPackages(session, allCandidates, resource, packages.m_exportedPkgs);
+            executor.execute(new Runnable()
+            {
+                public void run()
+                {
+                    calculateExportedPackages(session, allCandidates, resource, packages.m_exportedPkgs);
+                }
+            });
         }
+        executor.await();
 
         // Parallel compute package lists
         for (final Resource resource : allWireCandidates.keySet())
         {
-            getPackages(session, allCandidates, allWireCandidates, allPackages, resource, allPackages.get(resource));
+            executor.execute(new Runnable()
+            {
+                public void run()
+                {
+                    getPackages(session, allCandidates, allWireCandidates, allPackages, resource, allPackages.get(resource));
+                }
+            });
         }
+        executor.await();
 
         // Sequential compute package sources
         // TODO: make that parallel
@@ -1084,8 +1147,15 @@ public class ResolverImpl implements Res
         // Parallel compute uses
         for (final Resource resource : allWireCandidates.keySet())
         {
-            computeUses(session, allWireCandidates, allPackages, resource);
+            executor.execute(new Runnable()
+            {
+                public void run()
+                {
+                    computeUses(session, allWireCandidates, allPackages, resource);
+                }
+            });
         }
+        executor.await();
 
         return allPackages;
     }
@@ -2247,4 +2317,66 @@ public class ResolverImpl implements Res
         }
     }
 
+    private static class EnhancedExecutor
+    {
+        private final Executor executor;
+        private final AtomicInteger count = new AtomicInteger();
+
+        public EnhancedExecutor(Executor executor)
+        {
+            this.executor = executor;
+        }
+
+        public void execute(final Runnable runnable)
+        {
+            count.incrementAndGet();
+            executor.execute(new Runnable()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        runnable.run();
+                    }
+                    finally
+                    {
+                        if (count.decrementAndGet() == 0)
+                        {
+                            synchronized (count)
+                            {
+                                count.notifyAll();
+                            }
+                        }
+                    }
+                }
+            });
+        }
+
+        public void await()
+        {
+            synchronized (count)
+            {
+                if (count.get() > 0)
+                {
+                    try
+                    {
+                        count.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    static class DumbExecutor implements Executor
+    {
+        public void execute(Runnable command)
+        {
+            command.run();
+        }
+    }
+
 }