You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@maven.apache.org by GitBox <gi...@apache.org> on 2018/11/14 20:50:09 UTC

[GitHub] asfgit closed pull request #51: WAGON-537 Maven transfer speed of large artifacts is slow

asfgit closed pull request #51: WAGON-537 Maven transfer speed of large artifacts is slow
URL: https://github.com/apache/maven-wagon/pull/51
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
index 4cbf37d7..361390a4 100644
--- a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
+++ b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
@@ -42,8 +42,14 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
 /**
  * Implementation of common facilities for Wagon providers.
  *
@@ -53,6 +59,24 @@
     implements Wagon
 {
     protected static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+    protected static final int MAXIMUM_BUFFER_SIZE = 1024 * 512;
+
+    /**
+     * To efficiently buffer data, use a multiple of 4k
+     * as this is likely to match the hardware buffer size of certain
+     * storage devices.
+     */
+    protected static final int BUFFER_SEGMENT_SIZE = 4 * 1024;
+
+    /**
+     * The desired minimum amount of chunks in which a {@link Resource} shall be
+     * {@link #transfer(Resource, InputStream, OutputStream, int, long) transferred}.
+     * This corresponds to the minimum times {@link #fireTransferProgress(TransferEvent, byte[], int)}.
+     * 100 notifications is a conservative value that will lead to small chunks for
+     * any artifact less that {@link #BUFFER_SEGMENT_SIZE} * {@link #MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS}
+     * in size.
+     */
+    protected static final int MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS = 100;
 
     protected Repository repository;
 
@@ -560,31 +584,74 @@ protected void transfer( Resource resource, InputStream input, OutputStream outp
     protected void transfer( Resource resource, InputStream input, OutputStream output, int requestType, long maxSize )
         throws IOException
     {
-        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
+        ByteBuffer buffer = ByteBuffer.allocate( getBufferCapacityForTransferring( resource.getContentLength() ) );
+        int halfBufferCapacity = buffer.capacity() / 2;
 
         TransferEvent transferEvent = new TransferEvent( this, resource, TransferEvent.TRANSFER_PROGRESS, requestType );
         transferEvent.setTimestamp( System.currentTimeMillis() );
 
+        ReadableByteChannel in = Channels.newChannel( input );
+
         long remaining = maxSize;
         while ( remaining > 0 )
         {
-            // let's safely cast to int because the min value will be lower than the buffer size.
-            int n = input.read( buffer, 0, (int) Math.min( buffer.length, remaining ) );
+            int read = in.read( buffer );
 
-            if ( n == -1 )
+            if ( read == -1 )
             {
+                // EOF, but some data has not been written yet.
+                if ( buffer.position() != 0 )
+                {
+                    buffer.flip();
+                    fireTransferProgress( transferEvent, buffer.array(), buffer.limit() );
+                    output.write( buffer.array(), 0, buffer.limit() );
+                }
+
                 break;
             }
 
-            fireTransferProgress( transferEvent, buffer, n );
-
-            output.write( buffer, 0, n );
+            // Prevent minichunking / fragmentation: when less than half the buffer is utilized,
+            // read some more bytes before writing and firing progress.
+            if ( buffer.position() < halfBufferCapacity  )
+            {
+                continue;
+            }
 
-            remaining -= n;
+            buffer.flip();
+            fireTransferProgress( transferEvent, buffer.array(), buffer.limit() );
+            output.write( buffer.array(), 0, buffer.limit() );
+            remaining -= buffer.limit();
+            buffer.clear();
         }
         output.flush();
     }
 
+    /**
+     * Provides a buffer size for efficiently transferring the given amount of bytes, such that
+     * it is not fragmented into to many chunks. For larger files, larger buffers are provided such that downstream
+     * {@link #fireTransferProgress(TransferEvent, byte[], int) listeners} are not notified overly frequently.
+     * For instance, transferring gigabyte-sized resources would result in millions of notifications when using
+     * only a few kilobytes of buffer, drastically slowing transfer since transfer progress listeners and
+     * notifications are synchronous and may block, e.g. when writing download progress status to console.
+     *
+     * @param numberOfBytes can be 0 or less, in which case a default buffer size is used.
+     * @return a byte buffer suitable for transferring the given amount of bytes without too many chunks.
+     */
+    protected int getBufferCapacityForTransferring(long numberOfBytes )
+    {
+        if ( numberOfBytes <= 0 )
+        {
+            return DEFAULT_BUFFER_SIZE;
+        }
+
+        final int numberOfBufferSegments = ( ( int ) (
+            numberOfBytes / ( BUFFER_SEGMENT_SIZE * MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS ) )
+        );
+        final int potentialBufferSize = numberOfBufferSegments * BUFFER_SEGMENT_SIZE;
+        return min( MAXIMUM_BUFFER_SIZE, max( DEFAULT_BUFFER_SIZE, potentialBufferSize ) );
+    }
+
     // ----------------------------------------------------------------------
     //
     // ----------------------------------------------------------------------
diff --git a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
index 9f294f7e..d87cc857 100755
--- a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
+++ b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
@@ -84,6 +84,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -105,9 +109,6 @@
     private final class RequestEntityImplementation
         extends AbstractHttpEntity
     {
-
-        private static final int BUFFER_SIZE = 2048;
-
         private final Resource resource;
 
         private final Wagon wagon;
@@ -170,42 +171,47 @@ public void writeTo( final OutputStream outputStream )
             TransferEvent transferEvent =
                 new TransferEvent( wagon, resource, TransferEvent.TRANSFER_PROGRESS, TransferEvent.REQUEST_PUT );
             transferEvent.setTimestamp( System.currentTimeMillis() );
-            InputStream instream = ( this.source != null )
-                ? new FileInputStream( this.source )
-                : stream;
-            try
+
+            try ( ReadableByteChannel input = ( this.source != null )
+                    ? new RandomAccessFile( this.source, "r" ).getChannel()
+                    : Channels.newChannel( stream ) )
             {
-                byte[] buffer = new byte[BUFFER_SIZE];
-                int l;
-                if ( this.length < 0 )
-                {
-                    // until EOF
-                    while ( ( l = instream.read( buffer ) ) != -1 )
-                    {
-                        fireTransferProgress( transferEvent, buffer, -1 );
-                        outputStream.write( buffer, 0, l );
-                    }
-                }
-                else
+                ByteBuffer buffer = ByteBuffer.allocate( getBufferCapacityForTransferring( this.length ) );
+                int halfBufferCapacity = buffer.capacity() / 2;
+
+                long remaining = this.length < 0 ? Long.MAX_VALUE : this.length;
+                while ( remaining > 0 )
                 {
-                    // no need to consume more than length
-                    long remaining = this.length;
-                    while ( remaining > 0 )
+                    int read = input.read( buffer );
+                    if ( read == -1 )
                     {
-                        l = instream.read( buffer, 0, (int) Math.min( BUFFER_SIZE, remaining ) );
-                        if ( l == -1 )
+                        // EOF, but some data has not been written yet.
+                        if ( buffer.position() != 0 )
                         {
-                            break;
+                            buffer.flip();
+                            fireTransferProgress( transferEvent, buffer.array(), buffer.limit() );
+                            outputStream.write( buffer.array(), 0, buffer.limit() );
+                            buffer.clear();
                         }
-                        fireTransferProgress( transferEvent, buffer, (int) Math.min( BUFFER_SIZE, remaining ) );
-                        outputStream.write( buffer, 0, l );
-                        remaining -= l;
+
+                        break;
                     }
+
+                    // Prevent minichunking / fragmentation: when less than half the buffer is utilized,
+                    // read some more bytes before writing and firing progress.
+                    if ( buffer.position() < halfBufferCapacity )
+                    {
+                        continue;
+                    }
+
+                    buffer.flip();
+                    fireTransferProgress( transferEvent, buffer.array(), buffer.limit() );
+                    outputStream.write( buffer.array(), 0, buffer.limit() );
+                    remaining -= buffer.limit();
+                    buffer.clear();
+
                 }
-            }
-            finally
-            {
-                instream.close();
+                outputStream.flush();
             }
         }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services