You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/05/05 19:07:14 UTC

[drill] branch master updated (4c4953b -> 6cbba28)

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 4c4953b  DRILL-6094: Decimal data type enhancements
     new ba5a921  DRILL-6318: Push down limit past flatten is incorrect
     new 21733b0  DRILL-6347: Change method names to "visitField".
     new b913b3f  DRILL-6281: Refactor TimedRunnable (rename TimedRunnable to TimedCallable)
     new ac96381  DRILL-6281: Refactor TimedRunnable
     new d0a7545  DRILL-6281: Introduce Collectors class for internal iterators
     new 6cbba28  DRILL-6380: Fix sporadic mongo db hangs.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../drill/common/collections/Collectors.java       | 123 ++++++++++
 .../drill/exec/store/mongo/MongoTestSuit.java      |  12 +-
 .../planner/logical/DrillPushLimitToScanRule.java  |  13 +-
 .../physical/visitor/PrelVisualizerVisitor.java    |  16 +-
 .../org/apache/drill/exec/store/TimedCallable.java | 261 +++++++++++++++++++++
 .../org/apache/drill/exec/store/TimedRunnable.java | 208 ----------------
 .../drill/exec/store/parquet/FooterGatherer.java   |  20 +-
 .../exec/store/parquet/metadata/Metadata.java      |  33 ++-
 .../drill/exec/store/schedule/BlockMapBuilder.java |  23 +-
 .../test/java/org/apache/drill/TestBugFixes.java   |  13 +
 .../physical/impl/flatten/TestFlattenPlanning.java |   5 +-
 ...stTimedRunnable.java => TestTimedCallable.java} |  59 +++--
 .../src/test/resources/jsoninput/bug6318.json      |  12 +
 13 files changed, 497 insertions(+), 301 deletions(-)
 create mode 100644 common/src/main/java/org/apache/drill/common/collections/Collectors.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
 delete mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
 rename exec/java-exec/src/test/java/org/apache/drill/exec/store/{TestTimedRunnable.java => TestTimedCallable.java} (61%)
 create mode 100644 exec/java-exec/src/test/resources/jsoninput/bug6318.json

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.

[drill] 01/06: DRILL-6318: Push down limit past flatten is incorrect

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ba5a9215aeb891be938ba9997da8360f19699dc5
Author: Oleg <oz...@solit-clouds.ru>
AuthorDate: Tue Apr 10 11:26:25 2018 +0300

    DRILL-6318: Push down limit past flatten is incorrect
    
    closes #1204
---
 .../exec/planner/logical/DrillPushLimitToScanRule.java      | 13 ++++---------
 .../src/test/java/org/apache/drill/TestBugFixes.java        | 13 +++++++++++++
 .../exec/physical/impl/flatten/TestFlattenPlanning.java     |  5 +++--
 exec/java-exec/src/test/resources/jsoninput/bug6318.json    | 12 ++++++++++++
 4 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
index 2d33d38..79ba9b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushLimitToScanRule.java
@@ -67,9 +67,11 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
       // mess up the schema since Convert_FromJson() is different from other regular functions in that it only knows
       // the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have a way
       // to know the output type.
+      // Cannot pushdown limit and offset in to flatten as long as we don't know data distribution in flattened field
       if (!limitRel.isPushDown() && (limitRel.getFetch() != null)
           && (!DrillRelOptUtil.isLimit0(limitRel.getFetch())
-            || !DrillRelOptUtil.isProjectOutputSchemaUnknown(projectRel))) {
+            || !DrillRelOptUtil.isProjectOutputSchemaUnknown(projectRel))
+          && !DrillRelOptUtil.isProjectOutputRowcountUnknown(projectRel)) {
         return true;
       }
       return false;
@@ -82,14 +84,7 @@ public abstract class DrillPushLimitToScanRule extends RelOptRule {
       RelNode child = projectRel.getInput();
       final RelNode limitUnderProject = limitRel.copy(limitRel.getTraitSet(), ImmutableList.of(child));
       final RelNode newProject = projectRel.copy(projectRel.getTraitSet(), ImmutableList.of(limitUnderProject));
-      if (DrillRelOptUtil.isProjectOutputRowcountUnknown(projectRel)) {
-        //Preserve limit above the project since Flatten can produce more rows. Also mark it so we do not fire the rule again.
-        final RelNode limitAboveProject = new DrillLimitRel(limitRel.getCluster(), limitRel.getTraitSet(), newProject,
-            limitRel.getOffset(), limitRel.getFetch(), true);
-        call.transformTo(limitAboveProject);
-      } else {
-        call.transformTo(newProject);
-      }
+      call.transformTo(newProject);
     }
   };
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 100d194..f22db7b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -22,6 +22,7 @@ import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.test.BaseTestQuery;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -300,4 +301,16 @@ public class TestBugFixes extends BaseTestQuery {
       test("ALTER SESSION RESET `planner.slice_target`");
     }
   }
+
+  @Test
+  public void testDRILL6318() throws Exception {
+    int rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json`");
+    Assert.assertEquals(11, rows);
+
+    rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3");
+    Assert.assertEquals(3, rows);
+
+    rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3 OFFSET 5");
+    Assert.assertEquals(3, rows);
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
index 0e2d92c..9731aa2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlattenPlanning.java
@@ -66,8 +66,9 @@ public class TestFlattenPlanning extends PlanTestBase {
   @Test // DRILL-6099 : push limit past flatten(project)
   public void testLimitPushdownPastFlatten() throws Exception {
     final String query = "select rownum, flatten(complex) comp from cp.`store/json/test_flatten_mappify2.json` limit 1";
-    final String[] expectedPatterns = {".*Limit\\(fetch=\\[1\\]\\).*",".*Flatten.*",".*Limit\\(fetch=\\[1\\]\\).*"};
-    final String[] excludedPatterns = null;
+    //DRILL-6318 : limit should not push past flatten(project)
+    final String[] expectedPatterns = {"(?s).*Limit.*Flatten.*Project.*"};
+    final String[] excludedPatterns = {"(?s).*Limit.*Flatten.*Limit.*"};
     PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns);
   }
 
diff --git a/exec/java-exec/src/test/resources/jsoninput/bug6318.json b/exec/java-exec/src/test/resources/jsoninput/bug6318.json
new file mode 100644
index 0000000..1fdef8e
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/bug6318.json
@@ -0,0 +1,12 @@
+[
+	{ "name": "Helpless Templer", "data": [] },
+	{ "name": "Humble Grandma", "data": ["Honored Boy Scout", "Yawning Wolf"] },
+	{ "name": "Slow Stinger", "data": [] }, 
+	{ "name": "Slow Salesman", "data": ["Closed Queen", "Innocent Volunteer", "Junior Wing", "Lame Mantis", "Old Master", "Numb Pawn"] },
+	{ "name": "Mellow Tinkerbell", "data": [] },
+	{ "name": "Digital Mercury", "data": ["Hollow Guardian", "Twin Hurricane"] },
+	{ "name": "Last Beehive", "data": [] },
+	{ "name": "Infamous Balboa", "data": ["Helpless Avalange"] },
+	{ "name": "Cold Nurse", "data": [] },
+	{ "name": "Major Pawn", "data": [] }
+]
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.

[drill] 04/06: DRILL-6281: Refactor TimedRunnable

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ac96381e455a6127b7abad2cf19508f160b136cd
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Sat Apr 21 08:07:42 2018 -0700

    DRILL-6281: Refactor TimedRunnable
---
 .../org/apache/drill/exec/store/TimedCallable.java | 334 ++++++++++++---------
 .../drill/exec/store/parquet/FooterGatherer.java   |  14 +-
 .../exec/store/parquet/metadata/Metadata.java      |  12 +-
 .../drill/exec/store/schedule/BlockMapBuilder.java |  17 +-
 .../apache/drill/exec/store/TestTimedCallable.java |  45 ++-
 5 files changed, 239 insertions(+), 183 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
index 1afd716..ecc5579 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -19,190 +19,248 @@ package org.apache.drill.exec.store;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import com.google.common.base.Stopwatch;
-import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.UserException;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
  * TODO: look at switching to fork join.
  * @param <V> The time value that will be returned when the task is executed.
  */
-public abstract class TimedCallable<V> implements Runnable {
+public abstract class TimedCallable<V> implements Callable<V> {
+  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
 
   private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
 
-  private volatile Exception e;
-  private volatile long threadStart;
-  private volatile long timeNanos;
-  private volatile V value;
+  private volatile long startTime = 0;
+  private volatile long executionTime = -1;
 
-  @Override
-  public final void run() {
-    long start = System.nanoTime();
-    threadStart=start;
-    try{
-      value = runInner();
-    }catch(Exception e){
-      this.e = e;
-    }finally{
-      timeNanos = System.nanoTime() - start;
-    }
-  }
+  private static class FutureMapper<V> implements Function<Future<V>, V> {
+    int count;
+    Throwable throwable = null;
 
-  protected abstract V runInner() throws Exception ;
-  protected abstract IOException convertToIOException(Exception e);
+    private void setThrowable(Throwable t) {
+      if (throwable == null) {
+        throwable = t;
+      } else {
+        throwable.addSuppressed(t);
+      }
+    }
 
-  public long getThreadStart(){
-    return threadStart;
-  }
-  public long getTimeSpentNanos(){
-    return timeNanos;
+    @Override
+    public V apply(Future<V> future) {
+      Preconditions.checkState(future.isDone());
+      if (!future.isCancelled()) {
+        try {
+          count++;
+          return future.get();
+        } catch (InterruptedException e) {
+          // there is no wait as we are getting result from the completed/done future
+          logger.error("Unexpected exception", e);
+          throw UserException.internalError(e)
+              .message("Unexpected exception")
+              .build(logger);
+        } catch (ExecutionException e) {
+          setThrowable(e.getCause());
+        }
+      } else {
+        setThrowable(new CancellationException());
+      }
+      return null;
+    }
   }
 
-  public final V getValue() throws IOException {
-    if(e != null){
-      if(e instanceof IOException){
-        throw (IOException) e;
-      }else{
-        throw convertToIOException(e);
+  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
+    final long start = System.nanoTime();
+    final Stopwatch watch = Stopwatch.createStarted();
+    long totalExecution;
+    long maxExecution;
+    int count;
+    int startedCount;
+    private int doneCount;
+    // measure thread creation times
+    long earliestStart;
+    long latestStart;
+    long totalStart;
+
+    @Override
+    public void accept(TimedCallable<V> task) {
+      count++;
+      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
+      if (threadStart >= 0) {
+        startedCount++;
+        earliestStart = Math.min(earliestStart, threadStart);
+        latestStart = Math.max(latestStart, threadStart);
+        totalStart += threadStart;
+        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
+        if (executionTime != -1) {
+          doneCount++;
+          totalExecution += executionTime;
+          maxExecution = Math.max(maxExecution, executionTime);
+        } else {
+          logger.info("Task {} started at {} did not finish", task, threadStart);
+        }
+      } else {
+        logger.info("Task {} never commenced execution", task);
       }
     }
 
-    return value;
-  }
+    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
+      totalExecution = maxExecution = 0;
+      count = startedCount = doneCount = 0;
+      earliestStart = Long.MAX_VALUE;
+      latestStart = totalStart = 0;
+      tasks.forEach(this);
+      return this;
+    }
 
-  private static class LatchedRunnable implements Runnable {
-    final CountDownLatch latch;
-    final Runnable runnable;
+    void log(final String activity, final Logger logger, int parallelism) {
+      if (startedCount > 0) {
+        logger.debug("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
+            activity, startedCount, count, parallelism,
+            TimeUnit.NANOSECONDS.toMillis(earliestStart),
+            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
+            TimeUnit.NANOSECONDS.toMillis(latestStart));
+      } else {
+        logger.debug("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
+      }
 
-    public LatchedRunnable(CountDownLatch latch, Runnable runnable){
-      this.latch = latch;
-      this.runnable = runnable;
+      if (doneCount > 0) {
+        logger.debug("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
+            activity, doneCount, count, parallelism, watch.elapsed(TimeUnit.MILLISECONDS),
+            TimeUnit.NANOSECONDS.toMillis(totalExecution) / doneCount, TimeUnit.NANOSECONDS.toMillis(maxExecution));
+      } else {
+        logger.debug("{}: completed {} out of {} using {} threads", activity, doneCount, count, parallelism);
+      }
     }
+  }
 
-    @Override
-    public void run() {
-      try{
-        runnable.run();
-      }finally{
-        latch.countDown();
+  @Override
+  public final V call() throws Exception {
+    long start = System.nanoTime();
+    startTime = start;
+    try {
+      logger.debug("Started execution of '{}' task at {} ms", this, TimeUnit.MILLISECONDS.convert(start, TimeUnit.NANOSECONDS));
+      return runInner();
+    } catch (InterruptedException e) {
+      logger.warn("Task '{}' interrupted", this, e);
+      throw e;
+    } finally {
+      long time = System.nanoTime() - start;
+      if (logger.isWarnEnabled()) {
+        long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
+        if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
+          logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
+        } else {
+          logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
+        }
       }
+      executionTime = time;
     }
   }
 
+  protected abstract V runInner() throws Exception;
+
+  private long getStartTime(TimeUnit unit) {
+    return unit.convert(startTime, TimeUnit.NANOSECONDS);
+  }
+
+  private long getExecutionTime(TimeUnit unit) {
+    return unit.convert(executionTime, TimeUnit.NANOSECONDS);
+  }
+
+
   /**
    * Execute the list of runnables with the given parallelization.  At end, return values and report completion time
    * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
    * tasks will be cancelled and a {@link UserException} is thrown.
    * @param activity Name of activity for reporting in logger.
    * @param logger The logger to use to report results.
-   * @param runnables List of runnables that should be executed and timed.  If this list has one item, task will be
-   *                  completed in-thread. Runnable must handle {@link InterruptedException}s.
+   * @param tasks List of callable that should be executed and timed.  If this list has one item, task will be
+   *                  completed in-thread. Each callable must handle {@link InterruptedException}s.
    * @param parallelism  The number of threads that should be run to complete this task.
    * @return The list of outcome objects.
    * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
    */
-  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> runnables, int parallelism) throws IOException {
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    long timedRunnableStart=System.nanoTime();
-    if(runnables.size() == 1){
-      parallelism = 1;
-      runnables.get(0).run();
-    }else{
-      parallelism = Math.min(parallelism,  runnables.size());
-      final ExtendedLatch latch = new ExtendedLatch(runnables.size());
-      final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
-      try{
-        for(TimedCallable<V> runnable : runnables){
-          threadPool.submit(new LatchedRunnable(latch, runnable));
-        }
-
-        final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism);
-        if (!latch.awaitUninterruptibly(timeout)) {
-          // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel.
-          // It is highly important that the task Runnables are handling interrupts correctly.
-          threadPool.shutdownNow();
-
-          try {
-            // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
-            // any running threads. If the runnables are handling the interrupts properly they should be able to
-            // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
-            // thread leaks.
-            threadPool.awaitTermination(5, TimeUnit.SECONDS);
-          } catch (final InterruptedException e) {
-            logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
-          }
-
-          final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " +
-              "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism);
-          logger.error(errMsg);
-          throw UserException.resourceError()
-              .message(errMsg)
-              .build(logger);
-        }
-      } finally {
-        if (!threadPool.isShutdown()) {
-          threadPool.shutdown();
-        }
+  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism) throws IOException {
+    Preconditions.checkArgument(!Preconditions.checkNotNull(tasks).isEmpty(), "list of tasks is empty");
+    Preconditions.checkArgument(parallelism > 0);
+    parallelism = Math.min(parallelism, tasks.size());
+    final ExecutorService threadPool = parallelism == 1 ? MoreExecutors.newDirectExecutorService()
+        : Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setNameFormat(activity + "-%d").build());
+    final long timeout = TIMEOUT_PER_RUNNABLE_IN_MSECS * ((tasks.size() - 1)/parallelism + 1);
+    final FutureMapper<V> futureMapper = new FutureMapper<>();
+    final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null;
+    try {
+      return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS)
+          .stream()
+          .map(futureMapper)
+          .filter(Objects::nonNull)
+          .collect(Collectors.toList());
+    } catch (InterruptedException e) {
+      final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity);
+      logger.error(errMsg, e);
+      throw UserException.resourceError(e)
+          .message(errMsg)
+          .build(logger);
+    } catch (RejectedExecutionException e) {
+      final String errMsg = String.format("Failure while submitting activity '%s' tasks for execution.", activity);
+      logger.error(errMsg, e);
+      throw UserException.internalError(e)
+          .message(errMsg)
+          .build(logger);
+    } finally {
+      List<Runnable> notStartedTasks = threadPool.shutdownNow();
+      if (!notStartedTasks.isEmpty()) {
+        logger.error("{} activity '{}' tasks never commenced execution.", notStartedTasks.size(), activity);
       }
-    }
-
-    List<V> values = Lists.newArrayList();
-    long sum = 0;
-    long max = 0;
-    long count = 0;
-    // measure thread creation times
-    long earliestStart=Long.MAX_VALUE;
-    long latestStart=0;
-    long totalStart=0;
-    IOException excep = null;
-    for(final TimedCallable<V> reader : runnables){
-      try{
-        values.add(reader.getValue());
-        sum += reader.getTimeSpentNanos();
-        count++;
-        max = Math.max(max, reader.getTimeSpentNanos());
-        earliestStart=Math.min(earliestStart, reader.getThreadStart() - timedRunnableStart);
-        latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
-        totalStart+=latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
-      }catch(IOException e){
-        if(excep == null){
-          excep = e;
-        }else{
-          excep.addSuppressed(e);
+      try {
+        // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
+        // any running threads. If the tasks are handling the interrupts properly they should be able to
+        // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
+        // thread leaks.
+        if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+          logger.error("Detected run away tasks in activity '{}'.", activity);
         }
+      } catch (final InterruptedException e) {
+        logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
       }
-    }
-
-    if (watch != null) {
-      double avg = (sum/1000.0/1000.0)/(count*1.0d);
-      double avgStart = (totalStart/1000.0)/(count*1.0d);
-
-      logger.debug(
-          String.format("%s: Executed %d out of %d using %d threads. "
-              + "Time: %dms total, %fms avg, %dms max.",
-              activity, count, runnables.size(), parallelism, watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000));
-      logger.debug(
-              String.format("%s: Executed %d out of %d using %d threads. "
-                              + "Earliest start: %f \u03BCs, Latest start: %f \u03BCs, Average start: %f \u03BCs .",
-                      activity, count, runnables.size(), parallelism, earliestStart/1000.0, latestStart/1000.0, avgStart));
-      watch.stop();
-    }
 
-    if (excep != null) {
-      throw excep;
+      if (statistics != null) {
+        statistics.collect(tasks).log(activity, logger, parallelism);
+      }
+      if (futureMapper.count != tasks.size()) {
+        final String errMsg = String.format("Waited for %d ms, but only %d tasks for '%s' are complete." +
+            " Total number of tasks %d, parallelism %d.", timeout, futureMapper.count, activity, tasks.size(), parallelism);
+        logger.error(errMsg, futureMapper.throwable);
+        throw UserException.resourceError(futureMapper.throwable)
+            .message(errMsg)
+            .build(logger);
+      }
+      if (futureMapper.throwable != null) {
+        throw (futureMapper.throwable instanceof IOException) ?
+            (IOException)futureMapper.throwable : new IOException(futureMapper.throwable);
+      }
     }
-
-    return values;
-
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index 91219c7..ea34c7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.store.parquet;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -38,7 +40,8 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 
 public class FooterGatherer {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -66,8 +69,8 @@ public class FooterGatherer {
   }
 
   public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
-    final List<TimedCallable<Footer>> readers = Lists.newArrayList();
-    List<Footer> foundFooters = Lists.newArrayList();
+    final List<TimedCallable<Footer>> readers = new ArrayList<>();
+    final List<Footer> foundFooters = new ArrayList<>();
     for (FileStatus status : statuses) {
 
 
@@ -116,10 +119,9 @@ public class FooterGatherer {
     }
 
     @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException("Failure while trying to get footer for file " + status.getPath(), e);
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", status.getPath()).toString();
     }
-
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 24bde1d..49a6b52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -27,6 +27,8 @@ import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
@@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
@@ -362,13 +365,8 @@ public class Metadata {
       return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs);
     }
 
-    @Override
-    protected IOException convertToIOException(Exception e) {
-      if (e instanceof IOException) {
-        return (IOException) e;
-      } else {
-        return new IOException(e);
-      }
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", fileStatus.getPath()).toString();
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 6bd2ede..fdc8ba3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -19,14 +19,17 @@ package org.apache.drill.exec.store.schedule;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.TimedCallable;
@@ -48,6 +51,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+
 public class BlockMapBuilder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
   static final MetricRegistry metrics = DrillMetrics.getRegistry();
@@ -70,7 +75,7 @@ public class BlockMapBuilder {
 
   public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException {
 
-    List<TimedCallable<List<CompleteFileWork>>> readers = Lists.newArrayList();
+    List<TimedCallable<List<CompleteFileWork>>> readers = new ArrayList<>(files.size());
     for(FileStatus status : files){
       readers.add(new BlockMapReader(status, blockify));
     }
@@ -103,9 +108,9 @@ public class BlockMapBuilder {
 
     @Override
     protected List<CompleteFileWork> runInner() throws Exception {
-      final List<CompleteFileWork> work = Lists.newArrayList();
+      final List<CompleteFileWork> work = new ArrayList<>();
 
-      final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? Sets.<String>newHashSet() : null;
+      final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? new HashSet<>() : null;
 
       boolean error = false;
       if (blockify && !compressed(status)) {
@@ -143,12 +148,10 @@ public class BlockMapBuilder {
       return work;
     }
 
-
     @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException("Failure while trying to get block map for " + status.getPath(), e);
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", status.getPath()).toString();
     }
-
   }
 
   private class FileStatusWork implements FileWork{
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
index a34383d..ea34230 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store;
 
-import com.google.common.collect.Lists;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.test.TestTools;
 import org.apache.drill.test.DrillTest;
@@ -27,12 +27,13 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Unit testing for {@link TimedCallable}.
@@ -53,25 +54,22 @@ public class TestTimedCallable extends DrillTest {
 
     @Override
     protected Void runInner() throws Exception {
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        throw e;
-      }
+      Thread.sleep(sleepTime);
       return null;
     }
 
     @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException("Failure while trying to sleep for sometime", e);
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("sleepTime", sleepTime).toString();
     }
   }
 
   @Test
   public void withoutAnyTasksTriggeringTimeout() throws Exception {
-    List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
+    int count = 100;
+    List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
 
-    for(int i=0; i<100; i++){
+    for (int i = 0; i < count; i++) {
       tasks.add(new TestTask(2000));
     }
 
@@ -80,12 +78,11 @@ public class TestTimedCallable extends DrillTest {
 
   @Test
   public void withTasksExceedingTimeout() throws Exception {
-    UserException ex = null;
-
     try {
-      List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
+      int count = 100;
+      List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
 
-      for (int i = 0; i < 100; i++) {
+      for (int i = 0; i < count; i++) {
         if ((i & (i + 1)) == 0) {
           tasks.add(new TestTask(2000));
         } else {
@@ -94,22 +91,20 @@ public class TestTimedCallable extends DrillTest {
       }
 
       TimedCallable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+      fail("Expected a UserException");
     } catch (UserException e) {
-      ex = e;
+      assertThat(e.getMessage(),
+          containsString("Waited for 105000 ms, but only 87 tasks for 'Execution with some tasks triggering timeout' are " +
+              "complete. Total number of tasks 100, parallelism 16."));
     }
-
-    assertNotNull("Expected a UserException", ex);
-    assertThat(ex.getMessage(),
-        containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
-            "complete. Total runnable size 100, parallelism 16."));
   }
 
   @Test
   public void withManyTasks() throws Exception {
+    int count = 150000;
+    List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
 
-    List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
-
-    for (int i = 0; i < 150000; i++) {
+    for (int i = 0; i < count; i++) {
       tasks.add(new TestTask(0));
     }
 

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.

[drill] 05/06: DRILL-6281: Introduce Collectors class for internal iterators

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d0a7545c854e5ef99ed45edd1fe3520aa6dcaa74
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Mon Apr 30 07:11:20 2018 -0700

    DRILL-6281: Introduce Collectors class for internal iterators
    
    closes #1238
---
 .../drill/common/collections/Collectors.java       | 123 +++++++++++++++++++++
 .../org/apache/drill/exec/store/TimedCallable.java |   9 +-
 .../exec/store/parquet/metadata/Metadata.java      |  17 ++-
 3 files changed, 132 insertions(+), 17 deletions(-)

diff --git a/common/src/main/java/org/apache/drill/common/collections/Collectors.java b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
new file mode 100644
index 0000000..3e80b2f
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/collections/Collectors.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.collections;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+
+public class Collectors {
+  private Collectors() {
+  }
+
+  /**
+   *
+   * @param map {@code Map<K, V>} to collect elements from
+   * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T>
+   * @param <T> elements type in {@code List}
+   * @param <K> key type in {@code Map}
+   * @param <V> value type in {@code Map}
+   * @return new {@code List} that contains elements after applying mapper {@code BiFunction} to the input {@code Map}
+   */
+  public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper) {
+    return collect(new ArrayList<>(map.size()), map, mapper);
+  }
+
+  /**
+   *
+   * @param map {@code Map<K, V>} to collect elements from
+   * @param mapper {@code BiFunction} that maps from (key, value) pair to type <T>
+   * @param predicate {@code Predicate} filter to apply
+   * @param <T> elements type in {@code List}
+   * @param <K> keys type in {@code Map}
+   * @param <V> value type in {@code Map}
+   * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code BiFunction}
+   *   to the input {@code Map}
+   */
+  public static <T, K, V> List<T> toList(Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+    return collect(new ArrayList<>(map.size()), map, mapper, predicate);
+  }
+
+  public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper) {
+    Preconditions.checkNotNull(list);
+    Preconditions.checkNotNull(map);
+    Preconditions.checkNotNull(mapper);
+    map.forEach((k, v) -> list.add(mapper.apply(k, v)));
+    return list;
+  }
+
+  public static <T, K, V> List<T> collect(List<T> list, Map<K, V> map, BiFunction<K, V, T> mapper, Predicate<T> predicate) {
+    Preconditions.checkNotNull(list);
+    Preconditions.checkNotNull(map);
+    Preconditions.checkNotNull(mapper);
+    Preconditions.checkNotNull(predicate);
+    map.forEach((k, v) -> {
+      T t = mapper.apply(k, v);
+      if (predicate.test(t)) {
+        list.add(t);
+      }
+    });
+    return list;
+  }
+
+  /**
+   *
+   * @param collection {@code Collection<E>} of elements of type <E>
+   * @param mapper {@code Function<E, T>} mapper function to apply
+   * @param <T> elements type in {@code List}
+   * @param <E> elements type in {@code Collection}
+   * @return new {@code List} that contains elements that satisfy {@code Predicate} after applying mapper {@code Function}
+   *   to the input {@code Collection}
+   */
+  public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper) {
+    Preconditions.checkNotNull(collection);
+    Preconditions.checkNotNull(mapper);
+    ArrayList<T> list = new ArrayList<>(collection.size());
+    collection.forEach(e -> list.add(mapper.apply(e)));
+    return list;
+  }
+
+  /**
+   *
+   * @param collection {@code Collection<E>} of elements of type <E>
+   * @param mapper {@code Function<E, T>} mapper function to apply
+   * @param predicate {@code Predicate} filter to apply
+   * @param <T>  elements type in {@code List}
+   * @param <E> elements type in {@code Collection}
+   * @return new {@code List} that contains elements after applying mapper {@code Function} to the input {@code Collection}
+   */
+  public static <T, E> List<T> toList(Collection<E> collection, Function<E, T> mapper, Predicate<T> predicate) {
+    Preconditions.checkNotNull(collection);
+    Preconditions.checkNotNull(mapper);
+    Preconditions.checkNotNull(predicate);
+    ArrayList<T> list = new ArrayList<>(collection.size());
+    collection.forEach(e -> {
+      T t = mapper.apply(e);
+      if (predicate.test(t)) {
+        list.add(t);
+      }
+    });
+    return list;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
index ecc5579..3c2bbfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -30,8 +29,8 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
+import org.apache.drill.common.collections.Collectors;
 import org.apache.drill.common.exceptions.UserException;
 
 import org.slf4j.Logger;
@@ -212,11 +211,7 @@ public abstract class TimedCallable<V> implements Callable<V> {
     final FutureMapper<V> futureMapper = new FutureMapper<>();
     final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null;
     try {
-      return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS)
-          .stream()
-          .map(futureMapper)
-          .filter(Objects::nonNull)
-          .collect(Collectors.toList());
+      return Collectors.toList(threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS), futureMapper);
     } catch (InterruptedException e) {
       final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity);
       logger.error(errMsg, e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 49a6b52..cdf98e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.collections.Collectors;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
 import org.apache.drill.exec.store.TimedCallable;
@@ -68,7 +69,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
@@ -294,7 +294,7 @@ public class Metadata {
 
     Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream()
         .collect(
-            Collectors.toMap(
+            java.util.stream.Collectors.toMap(
                 Function.identity(),
                 s -> fs,
                 (oldFs, newFs) -> newFs,
@@ -335,14 +335,11 @@ public class Metadata {
    */
   private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
       ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException {
-
-    List<TimedCallable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream()
-        .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue()))
-        .collect(Collectors.toList());
-
-    List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>();
-    metaDataList.addAll(TimedCallable.run("Fetch parquet metadata", logger, gatherers, 16));
-    return metaDataList;
+    return TimedCallable.run("Fetch parquet metadata", logger,
+        Collectors.toList(fileStatusMap,
+            (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem)),
+        16
+    );
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.

[drill] 06/06: DRILL-6380: Fix sporadic mongo db hangs.

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 6cbba282d17d1670da5dd8788d1318a77abc0bf7
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Wed May 2 01:21:10 2018 -0700

    DRILL-6380: Fix sporadic mongo db hangs.
    
    closes #1249
---
 .../org/apache/drill/exec/store/mongo/MongoTestSuit.java     | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
index b3f0bd1..487396d 100644
--- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
+++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/MongoTestSuit.java
@@ -20,9 +20,10 @@ package org.apache.drill.exec.store.mongo;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
@@ -94,7 +95,9 @@ public class MongoTestSuit implements MongoTestConstants {
       configServers.add(crateConfigServerConfig(CONFIG_SERVER_3_PORT));
 
       // creating replicaSets
-      Map<String, List<IMongodConfig>> replicaSets = new HashMap<>();
+      // A LinkedHashMap ensures that the config servers are started first.
+      Map<String, List<IMongodConfig>> replicaSets = new LinkedHashMap<>();
+
       List<IMongodConfig> replicaSet1 = new ArrayList<>();
       replicaSet1.add(crateIMongodConfig(MONGOD_1_PORT, false,
           REPLICA_SET_1_NAME));
@@ -102,7 +105,6 @@ public class MongoTestSuit implements MongoTestConstants {
           REPLICA_SET_1_NAME));
       replicaSet1.add(crateIMongodConfig(MONGOD_3_PORT, false,
           REPLICA_SET_1_NAME));
-      replicaSets.put(REPLICA_SET_1_NAME, replicaSet1);
       List<IMongodConfig> replicaSet2 = new ArrayList<>();
       replicaSet2.add(crateIMongodConfig(MONGOD_4_PORT, false,
           REPLICA_SET_2_NAME));
@@ -110,8 +112,10 @@ public class MongoTestSuit implements MongoTestConstants {
           REPLICA_SET_2_NAME));
       replicaSet2.add(crateIMongodConfig(MONGOD_6_PORT, false,
           REPLICA_SET_2_NAME));
-      replicaSets.put(REPLICA_SET_2_NAME, replicaSet2);
+
       replicaSets.put(CONFIG_REPLICA_SET, configServers);
+      replicaSets.put(REPLICA_SET_1_NAME, replicaSet1);
+      replicaSets.put(REPLICA_SET_2_NAME, replicaSet2);
 
       // create mongos
       IMongosConfig mongosConfig = createIMongosConfig();

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.

[drill] 03/06: DRILL-6281: Refactor TimedRunnable (rename TimedRunnable to TimedCallable)

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b913b3f47249957b2292864a970847f137090f7a
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Fri Apr 20 18:00:20 2018 -0700

    DRILL-6281: Refactor TimedRunnable (rename TimedRunnable to TimedCallable)
---
 .../store/{TimedRunnable.java => TimedCallable.java} |  8 ++++----
 .../drill/exec/store/parquet/FooterGatherer.java     |  8 ++++----
 .../drill/exec/store/parquet/metadata/Metadata.java  |  8 ++++----
 .../drill/exec/store/schedule/BlockMapBuilder.java   |  8 ++++----
 ...TestTimedRunnable.java => TestTimedCallable.java} | 20 ++++++++++----------
 5 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
similarity index 97%
rename from exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
rename to exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
index 7cce2ad..1afd716 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedRunnable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -36,7 +36,7 @@ import com.google.common.collect.Lists;
  * TODO: look at switching to fork join.
  * @param <V> The time value that will be returned when the task is executed.
  */
-public abstract class TimedRunnable<V> implements Runnable {
+public abstract class TimedCallable<V> implements Runnable {
 
   private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
 
@@ -111,7 +111,7 @@ public abstract class TimedRunnable<V> implements Runnable {
    * @return The list of outcome objects.
    * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
    */
-  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedRunnable<V>> runnables, int parallelism) throws IOException {
+  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> runnables, int parallelism) throws IOException {
     Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     long timedRunnableStart=System.nanoTime();
     if(runnables.size() == 1){
@@ -122,7 +122,7 @@ public abstract class TimedRunnable<V> implements Runnable {
       final ExtendedLatch latch = new ExtendedLatch(runnables.size());
       final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
       try{
-        for(TimedRunnable<V> runnable : runnables){
+        for(TimedCallable<V> runnable : runnables){
           threadPool.submit(new LatchedRunnable(latch, runnable));
         }
 
@@ -165,7 +165,7 @@ public abstract class TimedRunnable<V> implements Runnable {
     long latestStart=0;
     long totalStart=0;
     IOException excep = null;
-    for(final TimedRunnable<V> reader : runnables){
+    for(final TimedCallable<V> reader : runnables){
       try{
         values.add(reader.getValue());
         sum += reader.getTimeSpentNanos();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index 3ba6ff0..91219c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -66,7 +66,7 @@ public class FooterGatherer {
   }
 
   public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
-    final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
+    final List<TimedCallable<Footer>> readers = Lists.newArrayList();
     List<Footer> foundFooters = Lists.newArrayList();
     for (FileStatus status : statuses) {
 
@@ -92,14 +92,14 @@ public class FooterGatherer {
 
     }
     if(!readers.isEmpty()){
-      foundFooters.addAll(TimedRunnable.run("Fetch Parquet Footers", logger, readers, parallelism));
+      foundFooters.addAll(TimedCallable.run("Fetch Parquet Footers", logger, readers, parallelism));
     }
 
     return foundFooters;
   }
 
 
-  private static class FooterReader extends TimedRunnable<Footer>{
+  private static class FooterReader extends TimedCallable<Footer> {
 
     final Configuration conf;
     final FileStatus status;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index cdb28c2..24bde1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -30,7 +30,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.store.dfs.MetadataContext;
 import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
@@ -333,19 +333,19 @@ public class Metadata {
   private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
       ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException {
 
-    List<TimedRunnable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream()
+    List<TimedCallable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream()
         .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue()))
         .collect(Collectors.toList());
 
     List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>();
-    metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16));
+    metaDataList.addAll(TimedCallable.run("Fetch parquet metadata", logger, gatherers, 16));
     return metaDataList;
   }
 
   /**
    * TimedRunnable that reads the footer from parquet and collects file metadata
    */
-  private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3> {
+  private class MetadataGatherer extends TimedCallable<ParquetFileMetadata_v3> {
 
     private final ParquetTableMetadata_v3 parquetTableMetadata;
     private final FileStatus fileStatus;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 942afa1..6bd2ede 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -70,11 +70,11 @@ public class BlockMapBuilder {
 
   public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException {
 
-    List<TimedRunnable<List<CompleteFileWork>>> readers = Lists.newArrayList();
+    List<TimedCallable<List<CompleteFileWork>>> readers = Lists.newArrayList();
     for(FileStatus status : files){
       readers.add(new BlockMapReader(status, blockify));
     }
-    List<List<CompleteFileWork>> work = TimedRunnable.run("Get block maps", logger, readers, 16);
+    List<List<CompleteFileWork>> work = TimedCallable.run("Get block maps", logger, readers, 16);
     List<CompleteFileWork> singleList = Lists.newArrayList();
     for(List<CompleteFileWork> innerWorkList : work){
       singleList.addAll(innerWorkList);
@@ -84,7 +84,7 @@ public class BlockMapBuilder {
 
   }
 
-  private class BlockMapReader extends TimedRunnable<List<CompleteFileWork>> {
+  private class BlockMapReader extends TimedCallable<List<CompleteFileWork>> {
     final FileStatus status;
 
     // This variable blockify indicates if a single file can be read by multiple threads
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
similarity index 84%
rename from exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
rename to exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
index 27b1ed2..a34383d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedRunnable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
@@ -35,16 +35,16 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 /**
- * Unit testing for {@link TimedRunnable}.
+ * Unit testing for {@link TimedCallable}.
  */
 @Category({SlowTest.class})
-public class TestTimedRunnable extends DrillTest {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedRunnable.class);
+public class TestTimedCallable extends DrillTest {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTimedCallable.class);
 
   @Rule
   public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // 3mins
 
-  private static class TestTask extends TimedRunnable {
+  private static class TestTask extends TimedCallable {
     final long sleepTime; // sleep time in ms
 
     public TestTask(final long sleepTime) {
@@ -69,13 +69,13 @@ public class TestTimedRunnable extends DrillTest {
 
   @Test
   public void withoutAnyTasksTriggeringTimeout() throws Exception {
-    List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+    List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
 
     for(int i=0; i<100; i++){
       tasks.add(new TestTask(2000));
     }
 
-    TimedRunnable.run("Execution without triggering timeout", logger, tasks, 16);
+    TimedCallable.run("Execution without triggering timeout", logger, tasks, 16);
   }
 
   @Test
@@ -83,7 +83,7 @@ public class TestTimedRunnable extends DrillTest {
     UserException ex = null;
 
     try {
-      List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+      List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
 
       for (int i = 0; i < 100; i++) {
         if ((i & (i + 1)) == 0) {
@@ -93,7 +93,7 @@ public class TestTimedRunnable extends DrillTest {
         }
       }
 
-      TimedRunnable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+      TimedCallable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
     } catch (UserException e) {
       ex = e;
     }
@@ -107,12 +107,12 @@ public class TestTimedRunnable extends DrillTest {
   @Test
   public void withManyTasks() throws Exception {
 
-    List<TimedRunnable<TestTask>> tasks = Lists.newArrayList();
+    List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
 
     for (int i = 0; i < 150000; i++) {
       tasks.add(new TestTask(0));
     }
 
-    TimedRunnable.run("Execution with lots of tasks", logger, tasks, 16);
+    TimedCallable.run("Execution with lots of tasks", logger, tasks, 16);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.

[drill] 02/06: DRILL-6347: Change method names to "visitField".

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 21733b01a4275b508d65768a578070f633548095
Author: Kui LIU <br...@gmail.com>
AuthorDate: Mon Apr 30 08:43:00 2018 +0200

    DRILL-6347: Change method names to "visitField".
    
    Further change the method names to "visitField" with Vlad Rozov's comments.
    
    closes #1236
---
 .../planner/physical/visitor/PrelVisualizerVisitor.java  | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
index 703d648..0ee685f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisualizerVisitor.java
@@ -87,11 +87,11 @@ public class PrelVisualizerVisitor
 
     }
 
-    public void field(String label, boolean value) {
-      field(label, Boolean.toString(value));
+    public void visitField(String label, boolean value) {
+      visitField(label, Boolean.toString(value));
     }
 
-    private void field(String label, String value) {
+    private void visitField(String label, String value) {
       indent();
       out.append(label)
          .append(" = ")
@@ -99,10 +99,10 @@ public class PrelVisualizerVisitor
          .append("\n");
     }
 
-    public void listField(String label,
+    public void visitField(String label,
         Object[] values) {
       if (values == null) {
-        field(label, "null");
+        visitField(label, "null");
         return;
       }
       StringBuilder buf = new StringBuilder();
@@ -120,7 +120,7 @@ public class PrelVisualizerVisitor
         }
       }
       buf.append("]");
-      field(label, buf.toString());
+      visitField(label, buf.toString());
     }
 
     @Override
@@ -156,8 +156,8 @@ public class PrelVisualizerVisitor
 
   private void visitBasePrel(Prel prel, VisualizationState value) {
     value.startNode(prel);
-    value.listField("encodings", prel.getSupportedEncodings());
-    value.field("needsReorder", prel.needsFinalColumnReordering());
+    value.visitField("encodings", prel.getSupportedEncodings());
+    value.visitField("needsReorder", prel.needsFinalColumnReordering());
   }
 
   private void endNode(Prel prel, VisualizationState value) throws Exception {

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.