You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/08/07 06:29:13 UTC

[incubator-druid] branch master updated: Use Closer instead of List (#8235)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7702005  Use Closer instead of List<Closeable> (#8235)
7702005 is described below

commit 7702005f8f8a8f3a81df44d3f7a7957cc58e7418
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Wed Aug 7 08:29:03 2019 +0200

    Use Closer instead of List<Closeable> (#8235)
    
    * Use Closer instead of List<Closeable>
    
    * Process comments
    
    * Catch an Exception instead
    
    * Removed unused import
---
 .../apache/druid/java/util/common/io/Closer.java   | 17 ++++++++++++---
 .../epinephelinae/GroupByMergingQueryRunnerV2.java | 24 +++++++++++++---------
 .../groupby/epinephelinae/GroupByRowProcessor.java | 17 ++++++++-------
 3 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
index f5bfb2c..b17e52c 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
@@ -108,12 +108,11 @@ public final class Closer implements Closeable
   }
 
   /**
-   * Registers the given {@code closeable} to be closed when this {@code Closer} is
+   * Registers the given {@code Closeable} to be closed when this {@code Closer} is
    * {@linkplain #close closed}.
    *
-   * @return the given {@code closeable}
+   * @return the given {@code Closeable}
    */
-  // close. this word no longer has any meaning to me.
   public <C extends Closeable> C register(@Nullable C closeable)
   {
     if (closeable != null) {
@@ -124,6 +123,18 @@ public final class Closer implements Closeable
   }
 
   /**
+   * Registers a list of {@code Closeable} to be closed when this {@code Closer} is
+   * {@linkplain #close closed}.
+   *
+   * @return the supplied list of {@code Closeable}
+   */
+  public <C extends Closeable> Iterable<C> registerAll(Iterable<C> closeables)
+  {
+    closeables.forEach(this::register);
+    return closeables;
+  }
+
+  /**
    * Stores the given throwable and rethrows it. It will be rethrown as is if it is an
    * {@code IOException}, {@code RuntimeException} or {@code Error}. Otherwise, it will be rethrown
    * wrapped in a {@code RuntimeException}. <b>Note:</b> Be sure to declare all of the checked
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index aa162db..398410b 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -39,8 +39,8 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.AbstractPrioritizedCallable;
 import org.apache.druid.query.ChainedExecutionQueryRunner;
@@ -58,7 +58,6 @@ import org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBas
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
@@ -166,7 +165,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
           @Override
           public CloseableGrouperIterator<RowBasedKey, ResultRow> make()
           {
-            final List<ReferenceCountingResourceHolder> resources = new ArrayList<>();
+            final Closer resources = Closer.create();
 
             try {
               final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
@@ -175,7 +174,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
               );
               final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
                   ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
-              resources.add(temporaryStorageHolder);
+              resources.register(temporaryStorageHolder);
 
               // If parallelCombine is enabled, we need two merge buffers for parallel aggregating and parallel combining
               final int numMergeBuffers = querySpecificConfig.getNumParallelCombineThreads() > 1 ? 2 : 1;
@@ -185,7 +184,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
                   hasTimeout,
                   timeoutAt
               );
-              resources.addAll(mergeBufferHolders);
+              resources.registerAll(mergeBufferHolders);
 
               final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder = mergeBufferHolders.get(0);
               final ReferenceCountingResourceHolder<ByteBuffer> combineBufferHolder = numMergeBuffers == 2 ?
@@ -214,7 +213,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
 
               final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
                   ReferenceCountingResourceHolder.fromCloseable(grouper);
-              resources.add(grouperHolder);
+              resources.register(grouperHolder);
 
               ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
                   Lists.newArrayList(
@@ -280,13 +279,18 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
               return RowBasedGrouperHelper.makeGrouperIterator(
                   grouper,
                   query,
-                  () -> Lists.reverse(resources).forEach(CloseQuietly::close)
+                  resources
               );
             }
-            catch (Throwable e) {
+            catch (Throwable t) {
               // Exception caught while setting up the iterator; release resources.
-              Lists.reverse(resources).forEach(CloseQuietly::close);
-              throw e;
+              try {
+                resources.close();
+              }
+              catch (Exception ex) {
+                t.addSuppressed(ex);
+              }
+              throw t;
             }
           }
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
index b33064a..f86b2f0 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByRowProcessor.java
@@ -21,14 +21,13 @@ package org.apache.druid.query.groupby.epinephelinae;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.ResourceLimitExceededException;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
@@ -40,8 +39,8 @@ import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
@@ -95,7 +94,7 @@ public class GroupByRowProcessor
       final int mergeBufferSize
   )
   {
-    final List<Closeable> closeOnExit = new ArrayList<>();
+    final Closer closeOnExit = Closer.create();
     final GroupByQueryConfig querySpecificConfig = config.withOverrides(query);
 
     final File temporaryStorageDirectory = new File(
@@ -108,7 +107,7 @@ public class GroupByRowProcessor
         querySpecificConfig.getMaxOnDiskStorage()
     );
 
-    closeOnExit.add(temporaryStorage);
+    closeOnExit.register(temporaryStorage);
 
     Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, ResultRow>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
         query,
@@ -120,7 +119,7 @@ public class GroupByRowProcessor
           public ByteBuffer get()
           {
             final ResourceHolder<ByteBuffer> mergeBufferHolder = resource.getMergeBuffer();
-            closeOnExit.add(mergeBufferHolder);
+            closeOnExit.register(mergeBufferHolder);
             return mergeBufferHolder.get();
           }
         },
@@ -130,7 +129,7 @@ public class GroupByRowProcessor
     );
     final Grouper<RowBasedKey> grouper = pair.lhs;
     final Accumulator<AggregateResult, ResultRow> accumulator = pair.rhs;
-    closeOnExit.add(grouper);
+    closeOnExit.register(grouper);
 
     final AggregateResult retVal = rows.accumulate(AggregateResult.ok(), accumulator);
 
@@ -147,9 +146,9 @@ public class GroupByRowProcessor
       }
 
       @Override
-      public void close()
+      public void close() throws IOException
       {
-        Lists.reverse(closeOnExit).forEach(CloseQuietly::close);
+        closeOnExit.close();
       }
     };
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org