You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by bo...@apache.org on 2019/08/18 15:57:57 UTC

[commons-compress] branch master updated: COMPRESS-485 bring back the old submit method - backwards compatibility

This is an automated email from the ASF dual-hosted git repository.

bodewig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-compress.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a19091  COMPRESS-485 bring back the old submit method - backwards compatibility
5a19091 is described below

commit 5a190918e21e34c4690c5010ca86040dacd0d481
Author: Stefan Bodewig <bo...@apache.org>
AuthorDate: Sun Aug 18 17:57:17 2019 +0200

    COMPRESS-485 bring back the old submit method - backwards compatibility
---
 .../archivers/zip/ParallelScatterZipCreator.java   | 41 ++++++++++++++-----
 .../zip/ParallelScatterZipCreatorTest.java         | 47 +++++++++++++++++++---
 2 files changed, 72 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
index dd95882..dfa8524 100644
--- a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
+++ b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
@@ -54,7 +54,7 @@ public class ParallelScatterZipCreator {
     private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
     private final ExecutorService es;
     private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
-    private final Deque<Future<ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
+    private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
 
     private final long startedAt = System.currentTimeMillis();
     private long compressionDoneAt = 0;
@@ -133,7 +133,7 @@ public class ParallelScatterZipCreator {
      */
 
     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
-        submit(createCallable(zipArchiveEntry, source));
+        submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
     }
 
     /**
@@ -146,7 +146,7 @@ public class ParallelScatterZipCreator {
      * @since 1.13
      */
     public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
-        submit(createCallable(zipArchiveEntryRequestSupplier));
+        submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
     }
 
     /**
@@ -156,7 +156,25 @@ public class ParallelScatterZipCreator {
      *
      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
      */
-    public final void submit(final Callable<ScatterZipOutputStream> callable) {
+    public final void submit(final Callable<? extends Object> callable) {
+        submitStreamAwareCallable(new Callable<ScatterZipOutputStream>() {
+            @Override
+            public ScatterZipOutputStream call() throws Exception {
+                callable.call();
+                return tlScatterStreams.get();
+            }
+        });
+    }
+
+    /**
+     * Submit a callable for compression.
+     *
+     * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
+     *
+     * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
+     * @since 1.19
+     */
+    public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
         futures.add(es.submit(callable));
     }
 
@@ -165,20 +183,21 @@ public class ParallelScatterZipCreator {
      *
      * <p>This method is expected to be called from a single client thread.</p>
      *
-     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submit submit}.
-     * The most common use case for using {@link #createCallable createCallable} and {@link #submit submit} from a
+     * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}.
+     * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a
      * client is if you want to wrap the callable in something that can be prioritized by the supplied
      * {@link ExecutorService}, for instance to process large or slow files first.
      * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
      *
      * @param zipArchiveEntry The entry to add.
      * @param source          The source input stream supplier
-     * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
+     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
      * value of this callable is not used, but any exceptions happening inside the compression
      * will be propagated through the callable.
      */
 
-    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
+    public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
+        final InputStreamSupplier source) {
         final int method = zipArchiveEntry.getMethod();
         if (method == ZipMethod.UNKNOWN_CODE) {
             throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
@@ -205,7 +224,7 @@ public class ParallelScatterZipCreator {
      * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
      *
      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
-     * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
+     * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
      * value of this callable is not used, but any exceptions happening inside the compression
      * will be propagated through the callable.
      * @since 1.13
@@ -229,7 +248,7 @@ public class ParallelScatterZipCreator {
      * </p>
      *
      * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link
-     * Callable}s {@link #submit}ted to this instance throws an exception, the archive can not be created properly and
+     * Callable}s {@link #submitStreamAwareCallable submit}ted to this instance throws an exception, the archive can not be created properly and
      * this method will throw an exception.</p>
      *
      * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
@@ -255,7 +274,7 @@ public class ParallelScatterZipCreator {
             // It is important that all threads terminate before we go on, ensure happens-before relationship
             compressionDoneAt = System.currentTimeMillis();
 
-            for (final Future<ScatterZipOutputStream> future : futures) {
+            for (final Future<? extends ScatterZipOutputStream> future : futures) {
                 ScatterZipOutputStream scatterStream = future.get();
                 scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
             }
diff --git a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
index 7a46ede..f2417a9 100644
--- a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
+++ b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
@@ -70,9 +70,38 @@ public class ParallelScatterZipCreatorTest {
     }
 
     @Test
-    public void callableApi()
-            throws Exception {
+    public void callableApiUsingSubmit() throws Exception {
         result = File.createTempFile("parallelScatterGather2", "");
+        callableApi(new CallableConsumerSupplier() {
+            @Override
+            public CallableConsumer apply(final ParallelScatterZipCreator zipCreator) {
+                return new CallableConsumer() {
+                    @Override
+                    public void accept(Callable<? extends ScatterZipOutputStream> c) {
+                        zipCreator.submit(c);
+                    }
+                };
+            }
+        });
+    }
+
+    @Test
+    public void callableApiUsingSubmitStreamAwareCallable() throws Exception {
+        result = File.createTempFile("parallelScatterGather3", "");
+        callableApi(new CallableConsumerSupplier() {
+            @Override
+            public CallableConsumer apply(final ParallelScatterZipCreator zipCreator) {
+                return new CallableConsumer() {
+                    @Override
+                    public void accept(Callable<? extends ScatterZipOutputStream> c) {
+                        zipCreator.submitStreamAwareCallable(c);
+                    }
+                };
+            }
+        });
+    }
+
+    private void callableApi(CallableConsumerSupplier consumerSupplier) throws Exception {
         final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
         zos.setEncoding("UTF-8");
         final ExecutorService es = Executors.newFixedThreadPool(1);
@@ -85,7 +114,7 @@ public class ParallelScatterZipCreatorTest {
         };
 
         final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp);
-        final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator);
+        final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
         zipCreator.writeTo(zos);
         zos.close();
 
@@ -137,7 +166,8 @@ public class ParallelScatterZipCreatorTest {
         return entries;
     }
 
-    private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator) {
+    private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator,
+                                                       final CallableConsumer consumer) {
         final Map<String, byte[]> entries = new HashMap<>();
         for (int i = 0; i < NUMITEMS; i++){
             final byte[] payloadBytes = ("content" + i).getBytes();
@@ -161,7 +191,7 @@ public class ParallelScatterZipCreatorTest {
                 callable = zipCreator.createCallable(zaSupplier);
             }
 
-            zipCreator.submit(callable);
+            consumer.accept(callable);
         }
         return entries;
     }
@@ -174,4 +204,11 @@ public class ParallelScatterZipCreatorTest {
         za.setUnixMode(UnixStat.FILE_FLAG | 0664);
         return za;
     }
+
+    private interface CallableConsumer {
+        void accept(Callable<? extends ScatterZipOutputStream> c);
+    }
+    private interface CallableConsumerSupplier {
+        CallableConsumer apply(ParallelScatterZipCreator zipCreator);
+    }
 }