You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by cs...@apache.org on 2023/02/07 11:03:37 UTC

[maven-resolver] branch master updated: [MRESOLVER-318] Cleanup redundant code and centralize executor handling (#239)

This is an automated email from the ASF dual-hosted git repository.

cstamas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-resolver.git


The following commit(s) were added to refs/heads/master by this push:
     new 90023ba9 [MRESOLVER-318] Cleanup redundant code and centralize executor handling (#239)
90023ba9 is described below

commit 90023ba935e2eb5bf0e71bbbf775b5e97d6435f1
Author: Tamas Cservenak <ta...@cservenak.net>
AuthorDate: Tue Feb 7 12:03:31 2023 +0100

    [MRESOLVER-318] Cleanup redundant code and centralize executor handling (#239)
    
    Cleanup of scattered/copy pasted code and adding utility class.
    
    ---
    
    https://issues.apache.org/jira/browse/MRESOLVER-318
---
 .../connector/basic/BasicRepositoryConnector.java  |  59 ++++-------
 .../internal/impl/DefaultMetadataResolver.java     |  35 +------
 .../impl/collect/bf/BfDependencyCollector.java     |  11 +--
 .../aether/util/concurrency/ExecutorUtils.java     | 109 +++++++++++++++++++++
 4 files changed, 139 insertions(+), 75 deletions(-)

diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java
index 6dd3e680..60f7412d 100644
--- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java
+++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java
@@ -33,10 +33,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.eclipse.aether.ConfigurationProperties;
@@ -70,8 +66,8 @@ import org.eclipse.aether.transfer.TransferResource;
 import org.eclipse.aether.transform.FileTransformer;
 import org.eclipse.aether.util.ConfigUtils;
 import org.eclipse.aether.util.FileUtils;
+import org.eclipse.aether.util.concurrency.ExecutorUtils;
 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
-import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,31 +141,27 @@ final class BasicRepositoryConnector
         this.providedChecksumsSources = providedChecksumsSources;
         this.closed = new AtomicBoolean( false );
 
-        maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
+        maxThreads = ExecutorUtils.threadCount( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
         smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
         persistedChecksums =
                 ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
                         ConfigurationProperties.PERSISTED_CHECKSUMS );
     }
 
-    private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
+    private Executor getExecutor( int tasks )
     {
         if ( maxThreads <= 1 )
         {
-            return DirectExecutor.INSTANCE;
+            return ExecutorUtils.DIRECT_EXECUTOR;
         }
-        int tasks = safe( artifacts ).size() + safe( metadatas ).size();
         if ( tasks <= 1 )
         {
-            return DirectExecutor.INSTANCE;
+            return ExecutorUtils.DIRECT_EXECUTOR;
         }
         if ( executor == null )
         {
-            executor =
-                    new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
-                            new LinkedBlockingQueue<>(),
-                            new WorkerThreadFactory( getClass().getSimpleName() + '-'
-                                    + repository.getHost() + '-' ) );
+            executor = ExecutorUtils.threadPool( maxThreads,
+                    getClass().getSimpleName() + '-' + repository.getHost() + '-' );
         }
         return executor;
     }
@@ -193,10 +185,7 @@ final class BasicRepositoryConnector
     {
         if ( closed.compareAndSet( false, true ) )
         {
-            if ( executor instanceof ExecutorService )
-            {
-                ( (ExecutorService) executor ).shutdown();
-            }
+            ExecutorUtils.shutdown( executor );
             transporter.close();
         }
     }
@@ -215,11 +204,14 @@ final class BasicRepositoryConnector
     {
         failIfClosed();
 
-        Executor executor = getExecutor( artifactDownloads, metadataDownloads );
+        Collection<? extends ArtifactDownload> safeArtifactDownloads = safe( artifactDownloads );
+        Collection<? extends MetadataDownload> safeMetadataDownloads = safe( metadataDownloads );
+
+        Executor executor = getExecutor( safeArtifactDownloads.size() + safeMetadataDownloads.size() );
         RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
         List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
 
-        for ( MetadataDownload transfer : safe( metadataDownloads ) )
+        for ( MetadataDownload transfer : safeMetadataDownloads )
         {
             URI location = layout.getLocation( transfer.getMetadata(), false );
 
@@ -239,7 +231,7 @@ final class BasicRepositoryConnector
             executor.execute( errorForwarder.wrap( task ) );
         }
 
-        for ( ArtifactDownload transfer : safe( artifactDownloads ) )
+        for ( ArtifactDownload transfer : safeArtifactDownloads )
         {
             Map<String, String> providedChecksums = Collections.emptyMap();
             for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() )
@@ -289,7 +281,10 @@ final class BasicRepositoryConnector
     {
         failIfClosed();
 
-        for ( ArtifactUpload transfer : safe( artifactUploads ) )
+        Collection<? extends ArtifactUpload> safeArtifactUploads = safe( artifactUploads );
+        Collection<? extends MetadataUpload> safeMetadataUploads = safe( metadataUploads );
+
+        for ( ArtifactUpload transfer : safeArtifactUploads )
         {
             URI location = layout.getLocation( transfer.getArtifact(), true );
 
@@ -302,10 +297,11 @@ final class BasicRepositoryConnector
 
             Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(),
                     checksumLocations, listener );
+
             task.run();
         }
 
-        for ( MetadataUpload transfer : safe( metadataUploads ) )
+        for ( MetadataUpload transfer : safeMetadataUploads )
         {
             URI location = layout.getLocation( transfer.getMetadata(), true );
 
@@ -317,6 +313,7 @@ final class BasicRepositoryConnector
                     layout.getChecksumLocations( transfer.getMetadata(), true, location );
 
             Runnable task = new PutTaskRunner( location, transfer.getFile(), checksumLocations, listener );
+
             task.run();
         }
     }
@@ -622,18 +619,4 @@ final class BasicRepositoryConnector
         }
 
     }
-
-    private static class DirectExecutor
-            implements Executor
-    {
-
-        static final Executor INSTANCE = new DirectExecutor();
-
-        @Override
-        public void execute( Runnable command )
-        {
-            command.run();
-        }
-
-    }
 }
diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java
index e5604834..7585c73a 100644
--- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java
+++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java
@@ -28,10 +28,6 @@ import java.util.List;
 import java.util.Map;
 import static java.util.Objects.requireNonNull;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -71,9 +67,8 @@ import org.eclipse.aether.transfer.MetadataNotFoundException;
 import org.eclipse.aether.transfer.MetadataTransferException;
 import org.eclipse.aether.transfer.NoRepositoryConnectorException;
 import org.eclipse.aether.transfer.RepositoryOfflineException;
-import org.eclipse.aether.util.ConfigUtils;
+import org.eclipse.aether.util.concurrency.ExecutorUtils;
 import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
-import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
 
 /**
  */
@@ -374,8 +369,9 @@ public class DefaultMetadataResolver
 
         if ( !tasks.isEmpty() )
         {
-            int threads = ConfigUtils.getInteger( session, 4, CONFIG_PROP_THREADS );
-            Executor executor = getExecutor( Math.min( tasks.size(), threads ) );
+            int threads = ExecutorUtils.threadCount( session, 4, CONFIG_PROP_THREADS );
+            Executor executor =  ExecutorUtils.executor(
+                    Math.min( tasks.size(), threads ), getClass().getSimpleName() + '-' );
             try
             {
                 RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
@@ -407,7 +403,7 @@ public class DefaultMetadataResolver
             }
             finally
             {
-                shutdown( executor );
+                ExecutorUtils.shutdown( executor );
             }
             for ( ResolveTask task : tasks )
             {
@@ -531,27 +527,6 @@ public class DefaultMetadataResolver
         repositoryEventDispatcher.dispatch( event.build() );
     }
 
-    private Executor getExecutor( int threads )
-    {
-        if ( threads <= 1 )
-        {
-            return command -> command.run();
-        }
-        else
-        {
-            return new ThreadPoolExecutor( threads, threads, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-                                           new WorkerThreadFactory( null ) );
-        }
-    }
-
-    private void shutdown( Executor executor )
-    {
-        if ( executor instanceof ExecutorService )
-        {
-            ( (ExecutorService) executor ).shutdown();
-        }
-    }
-
     class ResolveTask
         implements Runnable
     {
diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java
index c6e63e56..209de197 100644
--- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java
+++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java
@@ -37,9 +37,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -74,7 +71,7 @@ import org.eclipse.aether.resolution.VersionRangeResult;
 import org.eclipse.aether.spi.locator.Service;
 import org.eclipse.aether.util.ConfigUtils;
 import org.eclipse.aether.util.artifact.ArtifactIdUtils;
-import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
+import org.eclipse.aether.util.concurrency.ExecutorUtils;
 import org.eclipse.aether.util.graph.manager.DependencyManagerUtils;
 import org.eclipse.aether.version.Version;
 
@@ -145,7 +142,8 @@ public class BfDependencyCollector
         boolean useSkip = ConfigUtils.getBoolean(
                 session, CONFIG_PROP_SKIPPER_DEFAULT, CONFIG_PROP_SKIPPER
         );
-        int nThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
+        int nThreads = ExecutorUtils.threadCount(
+                session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
         logger.debug( "Using thread pool with {} threads to resolve descriptors.", nThreads );
 
         if ( useSkip )
@@ -485,8 +483,7 @@ public class BfDependencyCollector
 
         ParallelDescriptorResolver( int threads )
         {
-            this.executorService = new ThreadPoolExecutor( threads, threads, 3L, TimeUnit.SECONDS,
-                    new LinkedBlockingQueue<>(), new WorkerThreadFactory( getClass().getSimpleName() + "-" ) );
+            this.executorService = ExecutorUtils.threadPool( threads, getClass().getSimpleName() + "-" );
         }
 
         void resolveDescriptors( Artifact artifact, Callable<DescriptorResolutionResult> callable )
diff --git a/maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java b/maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java
new file mode 100644
index 00000000..716cfecb
--- /dev/null
+++ b/maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java
@@ -0,0 +1,109 @@
+package org.eclipse.aether.util.concurrency;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.aether.RepositorySystemSession;
+import org.eclipse.aether.util.ConfigUtils;
+
+/**
+ * Utilities for executors and sizing them.
+ *
+ * @since 1.9.5
+ */
+public final class ExecutorUtils
+{
+    /**
+     * Shared instance of "direct executor".
+     */
+    public static final Executor DIRECT_EXECUTOR = Runnable::run;
+
+    /**
+     * Creates new thread pool {@link ExecutorService}. The {@code poolSize} parameter but be greater than 1.
+     */
+    public static ExecutorService threadPool( int poolSize, String namePrefix )
+    {
+        if ( poolSize < 2 )
+        {
+            throw new IllegalArgumentException(
+                    "Invalid poolSize: " + poolSize + ". Must be greater than 1." );
+        }
+        return new ThreadPoolExecutor( poolSize, poolSize, 3L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new WorkerThreadFactory( namePrefix )
+        );
+    }
+
+    /**
+     * Returns {@link #DIRECT_EXECUTOR} or result of {@link #threadPool(int, String)} depending on value of
+     * {@code size} parameter.
+     */
+    public static Executor executor( int size, String namePrefix )
+    {
+        if ( size <= 1 )
+        {
+            return DIRECT_EXECUTOR;
+        }
+        else
+        {
+            return threadPool( size, namePrefix );
+        }
+    }
+
+    /**
+     * To be used with result of {@link #executor(int, String)} method, shuts down instance if it is
+     * {@link ExecutorService}.
+     */
+    public static void shutdown( Executor executor )
+    {
+        if ( executor instanceof ExecutorService )
+        {
+            ( (ExecutorService) executor ).shutdown();
+        }
+    }
+
+    /**
+     * Retrieves and validates requested thread count based on session and specified keys, or if none provided, the
+     * provided default value. This method validates result on top of what {@link ConfigUtils} does.
+     *
+     * @throws IllegalArgumentException if default value is less than 1.
+     * @see ConfigUtils#getInteger(RepositorySystemSession, int, String...)
+     */
+    public static int threadCount( RepositorySystemSession session, int defaultValue, String... keys )
+    {
+        if ( defaultValue < 1 )
+        {
+            throw new IllegalArgumentException(
+                    "Invalid defaultValue: " + defaultValue + ". Must be greater than 0." );
+        }
+        int threadCount = ConfigUtils.getInteger( session, defaultValue, keys );
+        if ( threadCount < 1 )
+        {
+            throw new IllegalArgumentException(
+                    "Invalid value: " + threadCount + ". Must be greater than 0." );
+        }
+        return threadCount;
+    }
+}