You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/08/07 06:29:13 UTC
[incubator-druid] branch master updated: Use Closer instead of
List (#8235)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7702005 Use Closer instead of List<Closeable> (#8235)
7702005 is described below
commit 7702005f8f8a8f3a81df44d3f7a7957cc58e7418
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Wed Aug 7 08:29:03 2019 +0200
Use Closer instead of List<Closeable> (#8235)
* Use Closer instead of List<Closeable>
* Process comments
* Catch an Exception instead
* Removed unused import
---
.../apache/druid/java/util/common/io/Closer.java | 17 ++++++++++++---
.../epinephelinae/GroupByMergingQueryRunnerV2.java | 24 +++++++++++++---------
.../groupby/epinephelinae/GroupByRowProcessor.java | 17 ++++++++-------
3 files changed, 36 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
index f5bfb2c..b17e52c 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
@@ -108,12 +108,11 @@ public final class Closer implements Closeable
}
/**
- * Registers the given {@code closeable} to be closed when this {@code Closer} is
+ * Registers the given {@code Closeable} to be closed when this {@code Closer} is
* {@linkplain #close closed}.
*
- * @return the given {@code closeable}
+ * @return the given {@code Closeable}
*/
- // close. this word no longer has any meaning to me.
public <C extends Closeable> C register(@Nullable C closeable)
{
if (closeable != null) {
@@ -124,6 +123,18 @@ public final class Closer implements Closeable
}
/**
+ * Registers a list of {@code Closeable} to be closed when this {@code Closer} is
+ * {@linkplain #close closed}.
+ *
+ * @return the supplied list of {@code Closeable}
+ */
+ public <C extends Closeable> Iterable<C> registerAll(Iterable<C> closeables)
+ {
+ closeables.forEach(this::register);
+ return closeables;
+ }
+
+ /**
* Stores the given throwable and rethrows it. It will be rethrown as is if it is an
* {@code IOException}, {@code RuntimeException} or {@code Error}. Otherwise, it will be rethrown
* wrapped in a {@code RuntimeException}. <b>Note:</b> Be sure to declare all of the checked
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index aa162db..398410b 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -39,8 +39,8 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.ChainedExecutionQueryRunner;
@@ -58,7 +58,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
@@ -166,7 +165,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
@Override
public CloseableGrouperIterator<RowBasedKey, ResultRow> make()
{
- final List<ReferenceCountingResourceHolder> resources = new ArrayList<>();
+ final Closer resources = Closer.create();
try {
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
@@ -175,7 +174,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
);
final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
- resources.add(temporaryStorageHolder);
+ resources.register(temporaryStorageHolder);
// If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining
final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1;
@@ -185,7 +184,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
hasTimeout,
timeoutAt
);
- resources.addAll(mergeBufferHolders);
+ resources.registerAll(mergeBufferHolders);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder = mergeBufferHolders.get(0);
final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder = numMergeBuffers == 2 ?
@@ -214,7 +213,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
- resources.add(grouperHolder);
+ resources.register(grouperHolder);
ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
Lists.newArrayList(
@@ -280,13 +279,18 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
return RowBasedGrouperHelper.makeGrouperIterator(
grouper,
query,
- () -> Lists.reverse(resources).forEach(CloseQuietly::close)
+ resources
);
}
- catch (Throwable e) {
+ catch (Throwable t) {
// Exception caught while setting up the iterator; release resources.
- Lists.reverse(resources).forEach(CloseQuietly::close);
- throw e;
+ try {
+ resources.close();
+ }
+ catch (Exception ex) {
+ t.addSuppressed(ex);
+ }
+ throw t;
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
index b33064a..f86b2f0 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
@@ -21,14 +21,13 @@ package org.apache.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
@@ -40,8 +39,8 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -95,7 +94,7 @@ public class GroupByRowProcessor
final int mergeBufferSize
)
{
- final List<Closeable> closeOnExit = new ArrayList<>();
+ final Closer closeOnExit = Closer.create();
final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
final File temporaryStorageDirectory = new File(
@@ -108,7 +107,7 @@ public class GroupByRowProcessor
querySpecificConfig.getMaxOnDiskStorage()
);
- closeOnExit.add(temporaryStorage);
+ closeOnExit.register(temporaryStorage);
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, ResultRow>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
@@ -120,7 +119,7 @@ public class GroupByRowProcessor
public ByteBuffer get()
{
final ResourceHolder<ByteBuffer> mergeBufferHolder = resource.getMergeBuffer();
- closeOnExit.add(mergeBufferHolder);
+ closeOnExit.register(mergeBufferHolder);
return mergeBufferHolder.get();
}
},
@@ -130,7 +129,7 @@ public class GroupByRowProcessor
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<AggregateResult, ResultRow> accumulator = pair.rhs;
- closeOnExit.add(grouper);
+ closeOnExit.register(grouper);
final AggregateResult retVal = rows.accumulate(AggregateResult.ok(), accumulator);
@@ -147,9 +146,9 @@ public class GroupByRowProcessor
}
@Override
- public void close()
+ public void close() throws IOException
{
- Lists.reverse(closeOnExit).forEach(CloseQuietly::close);
+ closeOnExit.close();
}
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org