You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by cs...@apache.org on 2023/02/07 11:03:37 UTC
[maven-resolver] branch master updated: [MRESOLVER-318] Cleanup redundant code and centralize executor handling (#239)
This is an automated email from the ASF dual-hosted git repository.
cstamas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-resolver.git
The following commit(s) were added to refs/heads/master by this push:
new 90023ba9 [MRESOLVER-318] Cleanup redundant code and centralize executor handling (#239)
90023ba9 is described below
commit 90023ba935e2eb5bf0e71bbbf775b5e97d6435f1
Author: Tamas Cservenak <ta...@cservenak.net>
AuthorDate: Tue Feb 7 12:03:31 2023 +0100
[MRESOLVER-318] Cleanup redundant code and centralize executor handling (#239)
Cleanup of scattered/copy pasted code and adding utility class.
---
https://issues.apache.org/jira/browse/MRESOLVER-318
---
.../connector/basic/BasicRepositoryConnector.java | 59 ++++-------
.../internal/impl/DefaultMetadataResolver.java | 35 +------
.../impl/collect/bf/BfDependencyCollector.java | 11 +--
.../aether/util/concurrency/ExecutorUtils.java | 109 +++++++++++++++++++++
4 files changed, 139 insertions(+), 75 deletions(-)
diff --git a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java
index 6dd3e680..60f7412d 100644
--- a/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java
+++ b/maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java
@@ -33,10 +33,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.aether.ConfigurationProperties;
@@ -70,8 +66,8 @@ import org.eclipse.aether.transfer.TransferResource;
import org.eclipse.aether.transform.FileTransformer;
import org.eclipse.aether.util.ConfigUtils;
import org.eclipse.aether.util.FileUtils;
+import org.eclipse.aether.util.concurrency.ExecutorUtils;
import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
-import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,31 +141,27 @@ final class BasicRepositoryConnector
this.providedChecksumsSources = providedChecksumsSources;
this.closed = new AtomicBoolean( false );
- maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
+ maxThreads = ExecutorUtils.threadCount( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
persistedChecksums =
ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
ConfigurationProperties.PERSISTED_CHECKSUMS );
}
- private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
+ private Executor getExecutor( int tasks )
{
if ( maxThreads <= 1 )
{
- return DirectExecutor.INSTANCE;
+ return ExecutorUtils.DIRECT_EXECUTOR;
}
- int tasks = safe( artifacts ).size() + safe( metadatas ).size();
if ( tasks <= 1 )
{
- return DirectExecutor.INSTANCE;
+ return ExecutorUtils.DIRECT_EXECUTOR;
}
if ( executor == null )
{
- executor =
- new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new WorkerThreadFactory( getClass().getSimpleName() + '-'
- + repository.getHost() + '-' ) );
+ executor = ExecutorUtils.threadPool( maxThreads,
+ getClass().getSimpleName() + '-' + repository.getHost() + '-' );
}
return executor;
}
@@ -193,10 +185,7 @@ final class BasicRepositoryConnector
{
if ( closed.compareAndSet( false, true ) )
{
- if ( executor instanceof ExecutorService )
- {
- ( (ExecutorService) executor ).shutdown();
- }
+ ExecutorUtils.shutdown( executor );
transporter.close();
}
}
@@ -215,11 +204,14 @@ final class BasicRepositoryConnector
{
failIfClosed();
- Executor executor = getExecutor( artifactDownloads, metadataDownloads );
+ Collection<? extends ArtifactDownload> safeArtifactDownloads = safe( artifactDownloads );
+ Collection<? extends MetadataDownload> safeMetadataDownloads = safe( metadataDownloads );
+
+ Executor executor = getExecutor( safeArtifactDownloads.size() + safeMetadataDownloads.size() );
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
- for ( MetadataDownload transfer : safe( metadataDownloads ) )
+ for ( MetadataDownload transfer : safeMetadataDownloads )
{
URI location = layout.getLocation( transfer.getMetadata(), false );
@@ -239,7 +231,7 @@ final class BasicRepositoryConnector
executor.execute( errorForwarder.wrap( task ) );
}
- for ( ArtifactDownload transfer : safe( artifactDownloads ) )
+ for ( ArtifactDownload transfer : safeArtifactDownloads )
{
Map<String, String> providedChecksums = Collections.emptyMap();
for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() )
@@ -289,7 +281,10 @@ final class BasicRepositoryConnector
{
failIfClosed();
- for ( ArtifactUpload transfer : safe( artifactUploads ) )
+ Collection<? extends ArtifactUpload> safeArtifactUploads = safe( artifactUploads );
+ Collection<? extends MetadataUpload> safeMetadataUploads = safe( metadataUploads );
+
+ for ( ArtifactUpload transfer : safeArtifactUploads )
{
URI location = layout.getLocation( transfer.getArtifact(), true );
@@ -302,10 +297,11 @@ final class BasicRepositoryConnector
Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(),
checksumLocations, listener );
+
task.run();
}
- for ( MetadataUpload transfer : safe( metadataUploads ) )
+ for ( MetadataUpload transfer : safeMetadataUploads )
{
URI location = layout.getLocation( transfer.getMetadata(), true );
@@ -317,6 +313,7 @@ final class BasicRepositoryConnector
layout.getChecksumLocations( transfer.getMetadata(), true, location );
Runnable task = new PutTaskRunner( location, transfer.getFile(), checksumLocations, listener );
+
task.run();
}
}
@@ -622,18 +619,4 @@ final class BasicRepositoryConnector
}
}
-
- private static class DirectExecutor
- implements Executor
- {
-
- static final Executor INSTANCE = new DirectExecutor();
-
- @Override
- public void execute( Runnable command )
- {
- command.run();
- }
-
- }
}
diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java
index e5604834..7585c73a 100644
--- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java
+++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java
@@ -28,10 +28,6 @@ import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
@@ -71,9 +67,8 @@ import org.eclipse.aether.transfer.MetadataNotFoundException;
import org.eclipse.aether.transfer.MetadataTransferException;
import org.eclipse.aether.transfer.NoRepositoryConnectorException;
import org.eclipse.aether.transfer.RepositoryOfflineException;
-import org.eclipse.aether.util.ConfigUtils;
+import org.eclipse.aether.util.concurrency.ExecutorUtils;
import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
-import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
/**
*/
@@ -374,8 +369,9 @@ public class DefaultMetadataResolver
if ( !tasks.isEmpty() )
{
- int threads = ConfigUtils.getInteger( session, 4, CONFIG_PROP_THREADS );
- Executor executor = getExecutor( Math.min( tasks.size(), threads ) );
+ int threads = ExecutorUtils.threadCount( session, 4, CONFIG_PROP_THREADS );
+ Executor executor = ExecutorUtils.executor(
+ Math.min( tasks.size(), threads ), getClass().getSimpleName() + '-' );
try
{
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
@@ -407,7 +403,7 @@ public class DefaultMetadataResolver
}
finally
{
- shutdown( executor );
+ ExecutorUtils.shutdown( executor );
}
for ( ResolveTask task : tasks )
{
@@ -531,27 +527,6 @@ public class DefaultMetadataResolver
repositoryEventDispatcher.dispatch( event.build() );
}
- private Executor getExecutor( int threads )
- {
- if ( threads <= 1 )
- {
- return command -> command.run();
- }
- else
- {
- return new ThreadPoolExecutor( threads, threads, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
- new WorkerThreadFactory( null ) );
- }
- }
-
- private void shutdown( Executor executor )
- {
- if ( executor instanceof ExecutorService )
- {
- ( (ExecutorService) executor ).shutdown();
- }
- }
-
class ResolveTask
implements Runnable
{
diff --git a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java
index c6e63e56..209de197 100644
--- a/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java
+++ b/maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java
@@ -37,9 +37,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -74,7 +71,7 @@ import org.eclipse.aether.resolution.VersionRangeResult;
import org.eclipse.aether.spi.locator.Service;
import org.eclipse.aether.util.ConfigUtils;
import org.eclipse.aether.util.artifact.ArtifactIdUtils;
-import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
+import org.eclipse.aether.util.concurrency.ExecutorUtils;
import org.eclipse.aether.util.graph.manager.DependencyManagerUtils;
import org.eclipse.aether.version.Version;
@@ -145,7 +142,8 @@ public class BfDependencyCollector
boolean useSkip = ConfigUtils.getBoolean(
session, CONFIG_PROP_SKIPPER_DEFAULT, CONFIG_PROP_SKIPPER
);
- int nThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
+ int nThreads = ExecutorUtils.threadCount(
+ session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
logger.debug( "Using thread pool with {} threads to resolve descriptors.", nThreads );
if ( useSkip )
@@ -485,8 +483,7 @@ public class BfDependencyCollector
ParallelDescriptorResolver( int threads )
{
- this.executorService = new ThreadPoolExecutor( threads, threads, 3L, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), new WorkerThreadFactory( getClass().getSimpleName() + "-" ) );
+ this.executorService = ExecutorUtils.threadPool( threads, getClass().getSimpleName() + "-" );
}
void resolveDescriptors( Artifact artifact, Callable<DescriptorResolutionResult> callable )
diff --git a/maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java b/maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java
new file mode 100644
index 00000000..716cfecb
--- /dev/null
+++ b/maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java
@@ -0,0 +1,109 @@
+package org.eclipse.aether.util.concurrency;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.eclipse.aether.RepositorySystemSession;
+import org.eclipse.aether.util.ConfigUtils;
+
+/**
+ * Utilities for executors and sizing them.
+ *
+ * @since 1.9.5
+ */
+public final class ExecutorUtils
+{
+ /**
+ * Shared instance of "direct executor".
+ */
+ public static final Executor DIRECT_EXECUTOR = Runnable::run;
+
+ /**
+ * Creates new thread pool {@link ExecutorService}. The {@code poolSize} parameter but be greater than 1.
+ */
+ public static ExecutorService threadPool( int poolSize, String namePrefix )
+ {
+ if ( poolSize < 2 )
+ {
+ throw new IllegalArgumentException(
+ "Invalid poolSize: " + poolSize + ". Must be greater than 1." );
+ }
+ return new ThreadPoolExecutor( poolSize, poolSize, 3L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new WorkerThreadFactory( namePrefix )
+ );
+ }
+
+ /**
+ * Returns {@link #DIRECT_EXECUTOR} or result of {@link #threadPool(int, String)} depending on value of
+ * {@code size} parameter.
+ */
+ public static Executor executor( int size, String namePrefix )
+ {
+ if ( size <= 1 )
+ {
+ return DIRECT_EXECUTOR;
+ }
+ else
+ {
+ return threadPool( size, namePrefix );
+ }
+ }
+
+ /**
+ * To be used with result of {@link #executor(int, String)} method, shuts down instance if it is
+ * {@link ExecutorService}.
+ */
+ public static void shutdown( Executor executor )
+ {
+ if ( executor instanceof ExecutorService )
+ {
+ ( (ExecutorService) executor ).shutdown();
+ }
+ }
+
+ /**
+ * Retrieves and validates requested thread count based on session and specified keys, or if none provided, the
+ * provided default value. This method validates result on top of what {@link ConfigUtils} does.
+ *
+ * @throws IllegalArgumentException if default value is less than 1.
+ * @see ConfigUtils#getInteger(RepositorySystemSession, int, String...)
+ */
+ public static int threadCount( RepositorySystemSession session, int defaultValue, String... keys )
+ {
+ if ( defaultValue < 1 )
+ {
+ throw new IllegalArgumentException(
+ "Invalid defaultValue: " + defaultValue + ". Must be greater than 0." );
+ }
+ int threadCount = ConfigUtils.getInteger( session, defaultValue, keys );
+ if ( threadCount < 1 )
+ {
+ throw new IllegalArgumentException(
+ "Invalid value: " + threadCount + ". Must be greater than 0." );
+ }
+ return threadCount;
+ }
+}