You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@commons.apache.org by bo...@apache.org on 2019/08/18 15:57:57 UTC
[commons-compress] branch master updated: COMPRESS-485 bring back
the old submit method - backwards compatibility
This is an automated email from the ASF dual-hosted git repository.
bodewig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-compress.git
The following commit(s) were added to refs/heads/master by this push:
new 5a19091 COMPRESS-485 bring back the old submit method - backwards compatibility
5a19091 is described below
commit 5a190918e21e34c4690c5010ca86040dacd0d481
Author: Stefan Bodewig <bo...@apache.org>
AuthorDate: Sun Aug 18 17:57:17 2019 +0200
COMPRESS-485 bring back the old submit method - backwards compatibility
---
.../archivers/zip/ParallelScatterZipCreator.java | 41 ++++++++++++++-----
.../zip/ParallelScatterZipCreatorTest.java | 47 +++++++++++++++++++---
2 files changed, 72 insertions(+), 16 deletions(-)
diff --git a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
index dd95882..dfa8524 100644
--- a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
+++ b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
@@ -54,7 +54,7 @@ public class ParallelScatterZipCreator {
private final Deque<ScatterZipOutputStream> streams = new ConcurrentLinkedDeque<>();
private final ExecutorService es;
private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
- private final Deque<Future<ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
+ private final Deque<Future<? extends ScatterZipOutputStream>> futures = new ConcurrentLinkedDeque<>();
private final long startedAt = System.currentTimeMillis();
private long compressionDoneAt = 0;
@@ -133,7 +133,7 @@ public class ParallelScatterZipCreator {
*/
public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
- submit(createCallable(zipArchiveEntry, source));
+ submitStreamAwareCallable(createCallable(zipArchiveEntry, source));
}
/**
@@ -146,7 +146,7 @@ public class ParallelScatterZipCreator {
* @since 1.13
*/
public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
- submit(createCallable(zipArchiveEntryRequestSupplier));
+ submitStreamAwareCallable(createCallable(zipArchiveEntryRequestSupplier));
}
/**
@@ -156,7 +156,25 @@ public class ParallelScatterZipCreator {
*
* @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
*/
- public final void submit(final Callable<ScatterZipOutputStream> callable) {
+ public final void submit(final Callable<? extends Object> callable) {
+ submitStreamAwareCallable(new Callable<ScatterZipOutputStream>() {
+ @Override
+ public ScatterZipOutputStream call() throws Exception {
+ callable.call();
+ return tlScatterStreams.get();
+ }
+ });
+ }
+
+ /**
+ * Submit a callable for compression.
+ *
+ * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
+ *
+ * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
+ * @since 1.19
+ */
+ public final void submitStreamAwareCallable(final Callable<? extends ScatterZipOutputStream> callable) {
futures.add(es.submit(callable));
}
@@ -165,20 +183,21 @@ public class ParallelScatterZipCreator {
*
* <p>This method is expected to be called from a single client thread.</p>
*
- * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submit submit}.
- * The most common use case for using {@link #createCallable createCallable} and {@link #submit submit} from a
+ * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submitStreamAwareCallable submitStreamAwareCallable}.
+ * The most common use case for using {@link #createCallable createCallable} and {@link #submitStreamAwareCallable submitStreamAwareCallable} from a
* client is if you want to wrap the callable in something that can be prioritized by the supplied
* {@link ExecutorService}, for instance to process large or slow files first.
* Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
*
* @param zipArchiveEntry The entry to add.
* @param source The source input stream supplier
- * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
+ * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
* value of this callable is not used, but any exceptions happening inside the compression
* will be propagated through the callable.
*/
- public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
+ public final Callable<ScatterZipOutputStream> createCallable(final ZipArchiveEntry zipArchiveEntry,
+ final InputStreamSupplier source) {
final int method = zipArchiveEntry.getMethod();
if (method == ZipMethod.UNKNOWN_CODE) {
throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
@@ -205,7 +224,7 @@ public class ParallelScatterZipCreator {
* @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
*
* @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
- * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
+ * @return A callable that should subsequently passed to #submitStreamAwareCallable, possibly in a wrapped/adapted from. The
* value of this callable is not used, but any exceptions happening inside the compression
* will be propagated through the callable.
* @since 1.13
@@ -229,7 +248,7 @@ public class ParallelScatterZipCreator {
* </p>
*
* <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link
- * Callable}s {@link #submit}ted to this instance throws an exception, the archive can not be created properly and
+ * Callable}s {@link #submitStreamAwareCallable submit}ted to this instance throws an exception, the archive can not be created properly and
* this method will throw an exception.</p>
*
* @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
@@ -255,7 +274,7 @@ public class ParallelScatterZipCreator {
// It is important that all threads terminate before we go on, ensure happens-before relationship
compressionDoneAt = System.currentTimeMillis();
- for (final Future<ScatterZipOutputStream> future : futures) {
+ for (final Future<? extends ScatterZipOutputStream> future : futures) {
ScatterZipOutputStream scatterStream = future.get();
scatterStream.zipEntryWriter().writeNextZipEntry(targetStream);
}
diff --git a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
index 7a46ede..f2417a9 100644
--- a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
+++ b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
@@ -70,9 +70,38 @@ public class ParallelScatterZipCreatorTest {
}
@Test
- public void callableApi()
- throws Exception {
+ public void callableApiUsingSubmit() throws Exception {
result = File.createTempFile("parallelScatterGather2", "");
+ callableApi(new CallableConsumerSupplier() {
+ @Override
+ public CallableConsumer apply(final ParallelScatterZipCreator zipCreator) {
+ return new CallableConsumer() {
+ @Override
+ public void accept(Callable<? extends ScatterZipOutputStream> c) {
+ zipCreator.submit(c);
+ }
+ };
+ }
+ });
+ }
+
+ @Test
+ public void callableApiUsingSubmitStreamAwareCallable() throws Exception {
+ result = File.createTempFile("parallelScatterGather3", "");
+ callableApi(new CallableConsumerSupplier() {
+ @Override
+ public CallableConsumer apply(final ParallelScatterZipCreator zipCreator) {
+ return new CallableConsumer() {
+ @Override
+ public void accept(Callable<? extends ScatterZipOutputStream> c) {
+ zipCreator.submitStreamAwareCallable(c);
+ }
+ };
+ }
+ });
+ }
+
+ private void callableApi(CallableConsumerSupplier consumerSupplier) throws Exception {
final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
zos.setEncoding("UTF-8");
final ExecutorService es = Executors.newFixedThreadPool(1);
@@ -85,7 +114,7 @@ public class ParallelScatterZipCreatorTest {
};
final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp);
- final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator);
+ final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
zipCreator.writeTo(zos);
zos.close();
@@ -137,7 +166,8 @@ public class ParallelScatterZipCreatorTest {
return entries;
}
- private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator) {
+ private Map<String, byte[]> writeEntriesAsCallable(final ParallelScatterZipCreator zipCreator,
+ final CallableConsumer consumer) {
final Map<String, byte[]> entries = new HashMap<>();
for (int i = 0; i < NUMITEMS; i++){
final byte[] payloadBytes = ("content" + i).getBytes();
@@ -161,7 +191,7 @@ public class ParallelScatterZipCreatorTest {
callable = zipCreator.createCallable(zaSupplier);
}
- zipCreator.submit(callable);
+ consumer.accept(callable);
}
return entries;
}
@@ -174,4 +204,11 @@ public class ParallelScatterZipCreatorTest {
za.setUnixMode(UnixStat.FILE_FLAG | 0664);
return za;
}
+
+ private interface CallableConsumer {
+ void accept(Callable<? extends ScatterZipOutputStream> c);
+ }
+ private interface CallableConsumerSupplier {
+ CallableConsumer apply(ParallelScatterZipCreator zipCreator);
+ }
}