You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/05/05 19:07:18 UTC

[drill] 04/06: DRILL-6281: Refactor TimedRunnable

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ac96381e455a6127b7abad2cf19508f160b136cd
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Sat Apr 21 08:07:42 2018 -0700

    DRILL-6281: Refactor TimedRunnable
---
 .../org/apache/drill/exec/store/TimedCallable.java | 334 ++++++++++++---------
 .../drill/exec/store/parquet/FooterGatherer.java   |  14 +-
 .../exec/store/parquet/metadata/Metadata.java      |  12 +-
 .../drill/exec/store/schedule/BlockMapBuilder.java |  17 +-
 .../apache/drill/exec/store/TestTimedCallable.java |  45 ++-
 5 files changed, 239 insertions(+), 183 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
index 1afd716..ecc5579 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java
@@ -19,190 +19,248 @@ package org.apache.drill.exec.store;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
-import com.google.common.base.Stopwatch;
-import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.exceptions.UserException;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
  * TODO: look at switching to fork join.
  * @param <V> The time value that will be returned when the task is executed.
  */
-public abstract class TimedCallable<V> implements Runnable {
+public abstract class TimedCallable<V> implements Callable<V> {
+  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
 
   private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
 
-  private volatile Exception e;
-  private volatile long threadStart;
-  private volatile long timeNanos;
-  private volatile V value;
+  private volatile long startTime = 0;
+  private volatile long executionTime = -1;
 
-  @Override
-  public final void run() {
-    long start = System.nanoTime();
-    threadStart=start;
-    try{
-      value = runInner();
-    }catch(Exception e){
-      this.e = e;
-    }finally{
-      timeNanos = System.nanoTime() - start;
-    }
-  }
+  private static class FutureMapper<V> implements Function<Future<V>, V> {
+    int count;
+    Throwable throwable = null;
 
-  protected abstract V runInner() throws Exception ;
-  protected abstract IOException convertToIOException(Exception e);
+    private void setThrowable(Throwable t) {
+      if (throwable == null) {
+        throwable = t;
+      } else {
+        throwable.addSuppressed(t);
+      }
+    }
 
-  public long getThreadStart(){
-    return threadStart;
-  }
-  public long getTimeSpentNanos(){
-    return timeNanos;
+    @Override
+    public V apply(Future<V> future) {
+      Preconditions.checkState(future.isDone());
+      if (!future.isCancelled()) {
+        try {
+          count++;
+          return future.get();
+        } catch (InterruptedException e) {
+          // there is no wait as we are getting result from the completed/done future
+          logger.error("Unexpected exception", e);
+          throw UserException.internalError(e)
+              .message("Unexpected exception")
+              .build(logger);
+        } catch (ExecutionException e) {
+          setThrowable(e.getCause());
+        }
+      } else {
+        setThrowable(new CancellationException());
+      }
+      return null;
+    }
   }
 
-  public final V getValue() throws IOException {
-    if(e != null){
-      if(e instanceof IOException){
-        throw (IOException) e;
-      }else{
-        throw convertToIOException(e);
+  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
+    final long start = System.nanoTime();
+    final Stopwatch watch = Stopwatch.createStarted();
+    long totalExecution;
+    long maxExecution;
+    int count;
+    int startedCount;
+    private int doneCount;
+    // measure thread creation times
+    long earliestStart;
+    long latestStart;
+    long totalStart;
+
+    @Override
+    public void accept(TimedCallable<V> task) {
+      count++;
+      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
+      if (threadStart >= 0) {
+        startedCount++;
+        earliestStart = Math.min(earliestStart, threadStart);
+        latestStart = Math.max(latestStart, threadStart);
+        totalStart += threadStart;
+        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
+        if (executionTime != -1) {
+          doneCount++;
+          totalExecution += executionTime;
+          maxExecution = Math.max(maxExecution, executionTime);
+        } else {
+          logger.info("Task {} started at {} did not finish", task, threadStart);
+        }
+      } else {
+        logger.info("Task {} never commenced execution", task);
       }
     }
 
-    return value;
-  }
+    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
+      totalExecution = maxExecution = 0;
+      count = startedCount = doneCount = 0;
+      earliestStart = Long.MAX_VALUE;
+      latestStart = totalStart = 0;
+      tasks.forEach(this);
+      return this;
+    }
 
-  private static class LatchedRunnable implements Runnable {
-    final CountDownLatch latch;
-    final Runnable runnable;
+    void log(final String activity, final Logger logger, int parallelism) {
+      if (startedCount > 0) {
+        logger.debug("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
+            activity, startedCount, count, parallelism,
+            TimeUnit.NANOSECONDS.toMillis(earliestStart),
+            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
+            TimeUnit.NANOSECONDS.toMillis(latestStart));
+      } else {
+        logger.debug("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
+      }
 
-    public LatchedRunnable(CountDownLatch latch, Runnable runnable){
-      this.latch = latch;
-      this.runnable = runnable;
+      if (doneCount > 0) {
+        logger.debug("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
+            activity, doneCount, count, parallelism, watch.elapsed(TimeUnit.MILLISECONDS),
+            TimeUnit.NANOSECONDS.toMillis(totalExecution) / doneCount, TimeUnit.NANOSECONDS.toMillis(maxExecution));
+      } else {
+        logger.debug("{}: completed {} out of {} using {} threads", activity, doneCount, count, parallelism);
+      }
     }
+  }
 
-    @Override
-    public void run() {
-      try{
-        runnable.run();
-      }finally{
-        latch.countDown();
+  @Override
+  public final V call() throws Exception {
+    long start = System.nanoTime();
+    startTime = start;
+    try {
+      logger.debug("Started execution of '{}' task at {} ms", this, TimeUnit.MILLISECONDS.convert(start, TimeUnit.NANOSECONDS));
+      return runInner();
+    } catch (InterruptedException e) {
+      logger.warn("Task '{}' interrupted", this, e);
+      throw e;
+    } finally {
+      long time = System.nanoTime() - start;
+      if (logger.isWarnEnabled()) {
+        long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
+        if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
+          logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
+        } else {
+          logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
+        }
       }
+      executionTime = time;
     }
   }
 
+  protected abstract V runInner() throws Exception;
+
+  private long getStartTime(TimeUnit unit) {
+    return unit.convert(startTime, TimeUnit.NANOSECONDS);
+  }
+
+  private long getExecutionTime(TimeUnit unit) {
+    return unit.convert(executionTime, TimeUnit.NANOSECONDS);
+  }
+
+
   /**
    * Execute the list of runnables with the given parallelization.  At end, return values and report completion time
    * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
    * tasks will be cancelled and a {@link UserException} is thrown.
    * @param activity Name of activity for reporting in logger.
    * @param logger The logger to use to report results.
-   * @param runnables List of runnables that should be executed and timed.  If this list has one item, task will be
-   *                  completed in-thread. Runnable must handle {@link InterruptedException}s.
+   * @param tasks List of callable that should be executed and timed.  If this list has one item, task will be
+   *                  completed in-thread. Each callable must handle {@link InterruptedException}s.
    * @param parallelism  The number of threads that should be run to complete this task.
    * @return The list of outcome objects.
    * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
    */
-  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> runnables, int parallelism) throws IOException {
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    long timedRunnableStart=System.nanoTime();
-    if(runnables.size() == 1){
-      parallelism = 1;
-      runnables.get(0).run();
-    }else{
-      parallelism = Math.min(parallelism,  runnables.size());
-      final ExtendedLatch latch = new ExtendedLatch(runnables.size());
-      final ExecutorService threadPool = Executors.newFixedThreadPool(parallelism);
-      try{
-        for(TimedCallable<V> runnable : runnables){
-          threadPool.submit(new LatchedRunnable(latch, runnable));
-        }
-
-        final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * runnables.size())/parallelism);
-        if (!latch.awaitUninterruptibly(timeout)) {
-          // Issue a shutdown request. This will cause existing threads to interrupt and pending threads to cancel.
-          // It is highly important that the task Runnables are handling interrupts correctly.
-          threadPool.shutdownNow();
-
-          try {
-            // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
-            // any running threads. If the runnables are handling the interrupts properly they should be able to
-            // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
-            // thread leaks.
-            threadPool.awaitTermination(5, TimeUnit.SECONDS);
-          } catch (final InterruptedException e) {
-            logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
-          }
-
-          final String errMsg = String.format("Waited for %dms, but tasks for '%s' are not complete. " +
-              "Total runnable size %d, parallelism %d.", timeout, activity, runnables.size(), parallelism);
-          logger.error(errMsg);
-          throw UserException.resourceError()
-              .message(errMsg)
-              .build(logger);
-        }
-      } finally {
-        if (!threadPool.isShutdown()) {
-          threadPool.shutdown();
-        }
+  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism) throws IOException {
+    Preconditions.checkArgument(!Preconditions.checkNotNull(tasks).isEmpty(), "list of tasks is empty");
+    Preconditions.checkArgument(parallelism > 0);
+    parallelism = Math.min(parallelism, tasks.size());
+    final ExecutorService threadPool = parallelism == 1 ? MoreExecutors.newDirectExecutorService()
+        : Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setNameFormat(activity + "-%d").build());
+    final long timeout = TIMEOUT_PER_RUNNABLE_IN_MSECS * ((tasks.size() - 1)/parallelism + 1);
+    final FutureMapper<V> futureMapper = new FutureMapper<>();
+    final Statistics<V> statistics = logger.isDebugEnabled() ? new Statistics<>() : null;
+    try {
+      return threadPool.invokeAll(tasks, timeout, TimeUnit.MILLISECONDS)
+          .stream()
+          .map(futureMapper)
+          .filter(Objects::nonNull)
+          .collect(Collectors.toList());
+    } catch (InterruptedException e) {
+      final String errMsg = String.format("Interrupted while waiting for activity '%s' tasks to be done.", activity);
+      logger.error(errMsg, e);
+      throw UserException.resourceError(e)
+          .message(errMsg)
+          .build(logger);
+    } catch (RejectedExecutionException e) {
+      final String errMsg = String.format("Failure while submitting activity '%s' tasks for execution.", activity);
+      logger.error(errMsg, e);
+      throw UserException.internalError(e)
+          .message(errMsg)
+          .build(logger);
+    } finally {
+      List<Runnable> notStartedTasks = threadPool.shutdownNow();
+      if (!notStartedTasks.isEmpty()) {
+        logger.error("{} activity '{}' tasks never commenced execution.", notStartedTasks.size(), activity);
       }
-    }
-
-    List<V> values = Lists.newArrayList();
-    long sum = 0;
-    long max = 0;
-    long count = 0;
-    // measure thread creation times
-    long earliestStart=Long.MAX_VALUE;
-    long latestStart=0;
-    long totalStart=0;
-    IOException excep = null;
-    for(final TimedCallable<V> reader : runnables){
-      try{
-        values.add(reader.getValue());
-        sum += reader.getTimeSpentNanos();
-        count++;
-        max = Math.max(max, reader.getTimeSpentNanos());
-        earliestStart=Math.min(earliestStart, reader.getThreadStart() - timedRunnableStart);
-        latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
-        totalStart+=latestStart=Math.max(latestStart, reader.getThreadStart()-timedRunnableStart);
-      }catch(IOException e){
-        if(excep == null){
-          excep = e;
-        }else{
-          excep.addSuppressed(e);
+      try {
+        // Wait for 5s for currently running threads to terminate. Above call (threadPool.shutdownNow()) interrupts
+        // any running threads. If the tasks are handling the interrupts properly they should be able to
+        // wrap up and terminate. If not waiting for 5s here gives a chance to identify and log any potential
+        // thread leaks.
+        if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+          logger.error("Detected run away tasks in activity '{}'.", activity);
         }
+      } catch (final InterruptedException e) {
+        logger.warn("Interrupted while waiting for pending threads in activity '{}' to terminate.", activity);
       }
-    }
-
-    if (watch != null) {
-      double avg = (sum/1000.0/1000.0)/(count*1.0d);
-      double avgStart = (totalStart/1000.0)/(count*1.0d);
-
-      logger.debug(
-          String.format("%s: Executed %d out of %d using %d threads. "
-              + "Time: %dms total, %fms avg, %dms max.",
-              activity, count, runnables.size(), parallelism, watch.elapsed(TimeUnit.MILLISECONDS), avg, max/1000/1000));
-      logger.debug(
-              String.format("%s: Executed %d out of %d using %d threads. "
-                              + "Earliest start: %f \u03BCs, Latest start: %f \u03BCs, Average start: %f \u03BCs .",
-                      activity, count, runnables.size(), parallelism, earliestStart/1000.0, latestStart/1000.0, avgStart));
-      watch.stop();
-    }
 
-    if (excep != null) {
-      throw excep;
+      if (statistics != null) {
+        statistics.collect(tasks).log(activity, logger, parallelism);
+      }
+      if (futureMapper.count != tasks.size()) {
+        final String errMsg = String.format("Waited for %d ms, but only %d tasks for '%s' are complete." +
+            " Total number of tasks %d, parallelism %d.", timeout, futureMapper.count, activity, tasks.size(), parallelism);
+        logger.error(errMsg, futureMapper.throwable);
+        throw UserException.resourceError(futureMapper.throwable)
+            .message(errMsg)
+            .build(logger);
+      }
+      if (futureMapper.throwable != null) {
+        throw (futureMapper.throwable instanceof IOException) ?
+            (IOException)futureMapper.throwable : new IOException(futureMapper.throwable);
+      }
     }
-
-    return values;
-
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index 91219c7..ea34c7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.store.parquet;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.drill.exec.store.TimedCallable;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -38,7 +40,8 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 
 public class FooterGatherer {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -66,8 +69,8 @@ public class FooterGatherer {
   }
 
   public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
-    final List<TimedCallable<Footer>> readers = Lists.newArrayList();
-    List<Footer> foundFooters = Lists.newArrayList();
+    final List<TimedCallable<Footer>> readers = new ArrayList<>();
+    final List<Footer> foundFooters = new ArrayList<>();
     for (FileStatus status : statuses) {
 
 
@@ -116,10 +119,9 @@ public class FooterGatherer {
     }
 
     @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException("Failure while trying to get footer for file " + status.getPath(), e);
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", status.getPath()).toString();
     }
-
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 24bde1d..49a6b52 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -27,6 +27,8 @@ import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillVersionInfo;
@@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
 import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
@@ -362,13 +365,8 @@ public class Metadata {
       return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs);
     }
 
-    @Override
-    protected IOException convertToIOException(Exception e) {
-      if (e instanceof IOException) {
-        return (IOException) e;
-      } else {
-        return new IOException(e);
-      }
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", fileStatus.getPath()).toString();
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 6bd2ede..fdc8ba3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -19,14 +19,17 @@ package org.apache.drill.exec.store.schedule;
 
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.TimedCallable;
@@ -48,6 +51,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+
 public class BlockMapBuilder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
   static final MetricRegistry metrics = DrillMetrics.getRegistry();
@@ -70,7 +75,7 @@ public class BlockMapBuilder {
 
   public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException {
 
-    List<TimedCallable<List<CompleteFileWork>>> readers = Lists.newArrayList();
+    List<TimedCallable<List<CompleteFileWork>>> readers = new ArrayList<>(files.size());
     for(FileStatus status : files){
       readers.add(new BlockMapReader(status, blockify));
     }
@@ -103,9 +108,9 @@ public class BlockMapBuilder {
 
     @Override
     protected List<CompleteFileWork> runInner() throws Exception {
-      final List<CompleteFileWork> work = Lists.newArrayList();
+      final List<CompleteFileWork> work = new ArrayList<>();
 
-      final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? Sets.<String>newHashSet() : null;
+      final Set<String> noDrillbitHosts = logger.isDebugEnabled() ? new HashSet<>() : null;
 
       boolean error = false;
       if (blockify && !compressed(status)) {
@@ -143,12 +148,10 @@ public class BlockMapBuilder {
       return work;
     }
 
-
     @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException("Failure while trying to get block map for " + status.getPath(), e);
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("path", status.getPath()).toString();
     }
-
   }
 
   private class FileStatusWork implements FileWork{
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
index a34383d..ea34230 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestTimedCallable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store;
 
-import com.google.common.collect.Lists;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.test.TestTools;
 import org.apache.drill.test.DrillTest;
@@ -27,12 +27,13 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
 import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Unit testing for {@link TimedCallable}.
@@ -53,25 +54,22 @@ public class TestTimedCallable extends DrillTest {
 
     @Override
     protected Void runInner() throws Exception {
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException e) {
-        throw e;
-      }
+      Thread.sleep(sleepTime);
       return null;
     }
 
     @Override
-    protected IOException convertToIOException(Exception e) {
-      return new IOException("Failure while trying to sleep for sometime", e);
+    public String toString() {
+      return new ToStringBuilder(this, SHORT_PREFIX_STYLE).append("sleepTime", sleepTime).toString();
     }
   }
 
   @Test
   public void withoutAnyTasksTriggeringTimeout() throws Exception {
-    List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
+    int count = 100;
+    List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
 
-    for(int i=0; i<100; i++){
+    for (int i = 0; i < count; i++) {
       tasks.add(new TestTask(2000));
     }
 
@@ -80,12 +78,11 @@ public class TestTimedCallable extends DrillTest {
 
   @Test
   public void withTasksExceedingTimeout() throws Exception {
-    UserException ex = null;
-
     try {
-      List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
+      int count = 100;
+      List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
 
-      for (int i = 0; i < 100; i++) {
+      for (int i = 0; i < count; i++) {
         if ((i & (i + 1)) == 0) {
           tasks.add(new TestTask(2000));
         } else {
@@ -94,22 +91,20 @@ public class TestTimedCallable extends DrillTest {
       }
 
       TimedCallable.run("Execution with some tasks triggering timeout", logger, tasks, 16);
+      fail("Expected a UserException");
     } catch (UserException e) {
-      ex = e;
+      assertThat(e.getMessage(),
+          containsString("Waited for 105000 ms, but only 87 tasks for 'Execution with some tasks triggering timeout' are " +
+              "complete. Total number of tasks 100, parallelism 16."));
     }
-
-    assertNotNull("Expected a UserException", ex);
-    assertThat(ex.getMessage(),
-        containsString("Waited for 93750ms, but tasks for 'Execution with some tasks triggering timeout' are not " +
-            "complete. Total runnable size 100, parallelism 16."));
   }
 
   @Test
   public void withManyTasks() throws Exception {
+    int count = 150000;
+    List<TimedCallable<TestTask>> tasks = new ArrayList<>(count);
 
-    List<TimedCallable<TestTask>> tasks = Lists.newArrayList();
-
-    for (int i = 0; i < 150000; i++) {
+    for (int i = 0; i < count; i++) {
       tasks.add(new TestTask(0));
     }
 

-- 
To stop receiving notification emails like this one, please contact
volodymyr@apache.org.