You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by vrozov <gi...@git.apache.org> on 2018/04/23 03:00:08 UTC

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

GitHub user vrozov opened a pull request:

    https://github.com/apache/drill/pull/1238

     DRILL-6281: Refactor TimedRunnable

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vrozov/drill DRILL-6281

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1238.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1238
    
----
commit c523734a563c62f58ea1e7161ad366777ba62035
Author: Vlad Rozov <vr...@...>
Date:   2018-04-21T01:00:20Z

    DRILL-6281: Refactor TimedRunnable (rename TimedRunnable to TimedCallable)

commit 63a483b393450e95d09911d756caf670f0a1fdb6
Author: Vlad Rozov <vr...@...>
Date:   2018-04-21T15:07:42Z

    DRILL-6281: Refactor TimedRunnable

----


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    DRILL-6281 is a subtask (preparation step) for DRILL-5908 Regression: Query intermittently may fail with error "Waited for 15000ms, but tasks for 'Get block maps' are not complete." Refactoring is necessary to do RCA.


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r183576419
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java ---
    @@ -66,8 +69,8 @@ private static void checkMagicBytes(FileStatus status, byte[] data, int offset)
       }
     
       public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
    -    final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
    -    List<Footer> foundFooters = Lists.newArrayList();
    +    final List<TimedCallable<Footer>> readers = new ArrayList<>();
    +    final List<Footer> foundFooters = new ArrayList<>();
    --- End diff --
    
    Any specific reason to not use Lists.newArrayList?


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r183875553
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution;
    +    long maxExecution;
    +    int count;
    +    int startedCount;
    +    private int doneCount;
    +    // measure thread creation times
    +    long earliestStart;
    +    long latestStart;
    +    long totalStart;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      count++;
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    +        }
    +      } else {
    +        logger.info("Task {} never commenced execution", task);
    +      }
    +    }
    +
    +    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
    +      totalExecution = maxExecution = 0;
    +      count = startedCount = doneCount = 0;
    +      earliestStart = Long.MAX_VALUE;
    +      latestStart = totalStart = 0;
    +      tasks.forEach(this);
    +      return this;
    +    }
    +
    +    void info(final String activity, final Logger logger, int parallelism) {
    +      if (startedCount > 0) {
    +        logger.info("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
    +            activity, startedCount, count, parallelism,
    +            TimeUnit.NANOSECONDS.toMillis(earliestStart),
    +            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
    +            TimeUnit.NANOSECONDS.toMillis(latestStart));
    +      } else {
    +        logger.info("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
    +      }
    +
    +      if (doneCount > 0) {
    +        logger.info("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
    +            activity, doneCount, count, parallelism, watch.elapsed(TimeUnit.MILLISECONDS),
    +            TimeUnit.NANOSECONDS.toMillis(totalExecution) / doneCount, TimeUnit.NANOSECONDS.toMillis(maxExecution));
    +      } else {
    +        logger.info("{}: completed {} out of {} using {} threads", activity, doneCount, count, parallelism);
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public final V call() throws Exception {
    +    long start = System.nanoTime();
    +    startTime = start;
    +    try {
    +      logger.debug("Started execution of '{}' task at {} ms", this, TimeUnit.MILLISECONDS.convert(start, TimeUnit.NANOSECONDS));
    +      return runInner();
    +    } catch (InterruptedException e) {
    +      logger.warn("Task '{}' interrupted", this, e);
    +      throw e;
    +    } finally {
    +      long time = System.nanoTime() - start;
    +      if (logger.isWarnEnabled()) {
    +        long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
    +        if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
    +          logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
    +        } else {
    +          logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
    +        }
    +      }
    +      executionTime = time;
    +    }
    +  }
    +
    +  protected abstract V runInner() throws Exception;
    +
    +  private long getStartTime(TimeUnit unit) {
    +    return unit.convert(startTime, TimeUnit.NANOSECONDS);
    +  }
    +
    +  private long getExecutionTime(TimeUnit unit) {
    +    return unit.convert(executionTime, TimeUnit.NANOSECONDS);
    +  }
    +
    +
    +  /**
    +   * Execute the list of runnables with the given parallelization.  At end, return values and report completion time
    +   * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
    +   * tasks will be cancelled and a {@link UserException} is thrown.
    +   * @param activity Name of activity for reporting in logger.
    +   * @param logger The logger to use to report results.
    +   * @param tasks List of callable that should be executed and timed.  If this list has one item, task will be
    +   *                  completed in-thread. Each callable must handle {@link InterruptedException}s.
    +   * @param parallelism  The number of threads that should be run to complete this task.
    +   * @return The list of outcome objects.
    +   * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
    +   */
    +  public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism) throws IOException {
    +    Preconditions.checkArgument(!Preconditions.checkNotNull(tasks).isEmpty(), "list of tasks is empty");
    +    parallelism = Math.min(parallelism, tasks.size());
    +    final ExecutorService threadPool = parallelism == 1 ? MoreExecutors.newDirectExecutorService()
    +        : Executors.newFixedThreadPool(parallelism, new ThreadFactoryBuilder().setNameFormat(activity + "-%d").build());
    +    final long timeout = (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * tasks.size())/parallelism);
    --- End diff --
    
    IMO, this code part of the original logic needs to be changed to TIMEOUT_PER_RUNNABLE_IN_MSECS * (long)Math.ceil(tasks.size()/parallelism); instead of (long)Math.ceil((TIMEOUT_PER_RUNNABLE_IN_MSECS * tasks.size())/parallelism). 


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184622525
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution = 0;
    +    long maxExecution = 0;
    +    int startedCount = 0;
    +    private int doneCount = 0;
    +    // measure thread creation times
    +    long earliestStart = Long.MAX_VALUE;
    +    long latestStart = 0;
    +    long totalStart = 0;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    --- End diff --
    
    I still think it should be debug level, such information does not carry any useful information to the user. If you'll query many parquet files, log will be overloaded with these messages.


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    @vrozov  Thanks for making the changes. Code changes looks good to me.


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r183397526
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution = 0;
    +    long maxExecution = 0;
    +    int startedCount = 0;
    +    private int doneCount = 0;
    +    // measure thread creation times
    +    long earliestStart = Long.MAX_VALUE;
    +    long latestStart = 0;
    +    long totalStart = 0;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    --- End diff --
    
    The "info" level here is intentional. It provides info about an incomplete or not started `callable`. The regular execution path assumes that all tasks should be complete.


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    @vrozov there is a todo about fork-join, did you consider it? My main point, if yes and it's not applicable, please remove todo. If no, do nothing.


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r183330705
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution = 0;
    +    long maxExecution = 0;
    +    int startedCount = 0;
    +    private int doneCount = 0;
    +    // measure thread creation times
    +    long earliestStart = Long.MAX_VALUE;
    +    long latestStart = 0;
    +    long totalStart = 0;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    --- End diff --
    
    I suggest we move it to debug level here and below? (I used to do these changes in my PR-1214 as part of `Reduced excessive logging when parquet files metadata is read.`).


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    I had the same question as Arina about the need for this change and I got info from DRILL-5908 about the problem cause and how this change will help fix it. Can you elucidate?


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    @parthchandra Please review.
    
    Note to a committer: please do *not* squash commits.


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184724657
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution;
    +    long maxExecution;
    +    int count;
    +    int startedCount;
    +    private int doneCount;
    +    // measure thread creation times
    +    long earliestStart;
    +    long latestStart;
    +    long totalStart;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      count++;
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    +        }
    +      } else {
    +        logger.info("Task {} never commenced execution", task);
    +      }
    +    }
    +
    +    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
    +      totalExecution = maxExecution = 0;
    +      count = startedCount = doneCount = 0;
    +      earliestStart = Long.MAX_VALUE;
    +      latestStart = totalStart = 0;
    +      tasks.forEach(this);
    +      return this;
    +    }
    +
    +    void info(final String activity, final Logger logger, int parallelism) {
    +      if (startedCount > 0) {
    +        logger.info("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
    +            activity, startedCount, count, parallelism,
    +            TimeUnit.NANOSECONDS.toMillis(earliestStart),
    +            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
    +            TimeUnit.NANOSECONDS.toMillis(latestStart));
    +      } else {
    +        logger.info("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
    +      }
    +
    +      if (doneCount > 0) {
    +        logger.info("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
    --- End diff --
    
    Sure, but in the future, I'd recommend
    - avoid mixing unrelated changes in a single JIRA/PR
    - review usage of INFO logging in a separate JIRA/PR
    - review production log level also in a separate JIRA/PR (it is uncommon to have INFO level in production currently used by Drill). INFO level is usually used in dev deployments.
    
    Please also see my comment in #1214.


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    The step is necessary to do RCA for DRILL-5908. There are way too many issues with the current implementation to list them in JIRA or PR and the major issue is the usage of homegrown solutions where Java (or other 3rd party libraries) already provides a required functionality out of the box. There is no need to use `Runnable` instead of `Callable` and provide custom `Callable` functionality. It is not necessary to wait on a `CountDownLatch` when `ExecutionService` provides the ability to invoke all tasks and return results when all tasks complete or a timeout expires.


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184691926
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution = 0;
    +    long maxExecution = 0;
    +    int startedCount = 0;
    +    private int doneCount = 0;
    +    // measure thread creation times
    +    long earliestStart = Long.MAX_VALUE;
    +    long latestStart = 0;
    +    long totalStart = 0;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    --- End diff --
    
     If all tasks complete within the timeout limit as expected there will be no log entry.


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    @vrozov could you please post some line about the changes being done during refactoring, reasons and benefits?


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184701824
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution;
    +    long maxExecution;
    +    int count;
    +    int startedCount;
    +    private int doneCount;
    +    // measure thread creation times
    +    long earliestStart;
    +    long latestStart;
    +    long totalStart;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      count++;
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    +        }
    +      } else {
    +        logger.info("Task {} never commenced execution", task);
    +      }
    +    }
    +
    +    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
    +      totalExecution = maxExecution = 0;
    +      count = startedCount = doneCount = 0;
    +      earliestStart = Long.MAX_VALUE;
    +      latestStart = totalStart = 0;
    +      tasks.forEach(this);
    +      return this;
    +    }
    +
    +    void info(final String activity, final Logger logger, int parallelism) {
    +      if (startedCount > 0) {
    +        logger.info("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
    +            activity, startedCount, count, parallelism,
    +            TimeUnit.NANOSECONDS.toMillis(earliestStart),
    +            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
    +            TimeUnit.NANOSECONDS.toMillis(latestStart));
    +      } else {
    +        logger.info("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
    +      }
    +
    +      if (doneCount > 0) {
    +        logger.info("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
    --- End diff --
    
    It's already changed to debug in PR-1214, please make sure to preserve those changes while resolving the conflicts.


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    I did not change how tasks (`Runnable` or `Callable`) behave and did not look into converting `Callable/Runnable` to a `ForkJoinTask`. Whether existing tasks can be scheduled recursively or not depends on the nature of those tasks and is not the scope of this PR. I'd suggest filing a JIRA if one does not exist already to look into `Fork/Join` (this is what I would expect from the developer who put "TODO").


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    Fair enough. But that *still* does not give me a clue about the problem(s) you were trying to fix, or how the refactoring helps. Is the cause of the problem in TimedRunnable? 
    Not an unreasonable request is it?



---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184693216
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution = 0;
    +    long maxExecution = 0;
    +    int startedCount = 0;
    +    private int doneCount = 0;
    +    // measure thread creation times
    +    long earliestStart = Long.MAX_VALUE;
    +    long latestStart = 0;
    +    long totalStart = 0;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    --- End diff --
    
    warning instead of info?


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by arina-ielchiieva <gi...@git.apache.org>.
Github user arina-ielchiieva commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184693553
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution;
    +    long maxExecution;
    +    int count;
    +    int startedCount;
    +    private int doneCount;
    +    // measure thread creation times
    +    long earliestStart;
    +    long latestStart;
    +    long totalStart;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      count++;
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    +        }
    +      } else {
    +        logger.info("Task {} never commenced execution", task);
    +      }
    +    }
    +
    +    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
    +      totalExecution = maxExecution = 0;
    +      count = startedCount = doneCount = 0;
    +      earliestStart = Long.MAX_VALUE;
    +      latestStart = totalStart = 0;
    +      tasks.forEach(this);
    +      return this;
    +    }
    +
    +    void info(final String activity, final Logger logger, int parallelism) {
    +      if (startedCount > 0) {
    +        logger.info("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
    +            activity, startedCount, count, parallelism,
    +            TimeUnit.NANOSECONDS.toMillis(earliestStart),
    +            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
    +            TimeUnit.NANOSECONDS.toMillis(latestStart));
    +      } else {
    +        logger.info("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
    +      }
    +
    +      if (doneCount > 0) {
    +        logger.info("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
    --- End diff --
    
    What about this info?


---

[GitHub] drill issue #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1238
  
    There is not enough info available to debug and/or troubleshoot DRILL-5908 and I prefer instead of trying to find bugs in homegrown solution replace it with Java out of the box functionality and at the same time provide an ability to log enough information to do RCA for DRILL-5908.
    
    IMO, there are no unreasonable requests on PR/JIRA 😄. 


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r184694930
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution;
    +    long maxExecution;
    +    int count;
    +    int startedCount;
    +    private int doneCount;
    +    // measure thread creation times
    +    long earliestStart;
    +    long latestStart;
    +    long totalStart;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      count++;
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    +        }
    +      } else {
    +        logger.info("Task {} never commenced execution", task);
    +      }
    +    }
    +
    +    Statistics<V> collect(final List<TimedCallable<V>> tasks) {
    +      totalExecution = maxExecution = 0;
    +      count = startedCount = doneCount = 0;
    +      earliestStart = Long.MAX_VALUE;
    +      latestStart = totalStart = 0;
    +      tasks.forEach(this);
    +      return this;
    +    }
    +
    +    void info(final String activity, final Logger logger, int parallelism) {
    +      if (startedCount > 0) {
    +        logger.info("{}: started {} out of {} using {} threads. (start time: min {} ms, avg {} ms, max {} ms).",
    +            activity, startedCount, count, parallelism,
    +            TimeUnit.NANOSECONDS.toMillis(earliestStart),
    +            TimeUnit.NANOSECONDS.toMillis(totalStart) / startedCount,
    +            TimeUnit.NANOSECONDS.toMillis(latestStart));
    +      } else {
    +        logger.info("{}: started {} out of {} using {} threads.", activity, startedCount, count, parallelism);
    +      }
    +
    +      if (doneCount > 0) {
    +        logger.info("{}: completed {} out of {} using {} threads (execution time: total {} ms, avg {} ms, max {} ms).",
    --- End diff --
    
    This is original level from TimedRunnable. If currently there is an excessive logging (I doubt, production system should use WARN or ERROR), please file a JIRA to change it.


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r183578626
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java ---
    @@ -66,8 +69,8 @@ private static void checkMagicBytes(FileStatus status, byte[] data, int offset)
       }
     
       public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
    -    final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
    -    List<Footer> foundFooters = Lists.newArrayList();
    +    final List<TimedCallable<Footer>> readers = new ArrayList<>();
    +    final List<Footer> foundFooters = new ArrayList<>();
    --- End diff --
    
    Please see Guava Java doc [Lists.newArrayList()](http://google.github.io/guava/releases/snapshot/api/docs/com/google/common/collect/Lists.html#newArrayList--):
    
    > **Note for Java 7 and later:** this method is now unnecessary and should be treated as deprecated. Instead, use the ArrayList [constructor](https://docs.oracle.com/javase/9/docs/api/java/util/ArrayList.html?is-external=true#ArrayList--) directly, taking advantage of the new ["diamond" syntax](http://goo.gl/iz2Wi).


---

[GitHub] drill pull request #1238: DRILL-6281: Refactor TimedRunnable

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1238#discussion_r183576093
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Objects;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.function.Consumer;
    +import java.util.function.Function;
    +import java.util.stream.Collectors;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.base.Stopwatch;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +
    +/**
    + * Class used to allow parallel executions of tasks in a simplified way. Also maintains and reports timings of task completion.
    + * TODO: look at switching to fork join.
    + * @param <V> The time value that will be returned when the task is executed.
    + */
    +public abstract class TimedCallable<V> implements Callable<V> {
    +  private static final Logger logger = LoggerFactory.getLogger(TimedCallable.class);
    +
    +  private static long TIMEOUT_PER_RUNNABLE_IN_MSECS = 15000;
    +
    +  private volatile long startTime = 0;
    +  private volatile long executionTime = -1;
    +
    +  private static class FutureMapper<V> implements Function<Future<V>, V> {
    +    int count;
    +    Throwable throwable = null;
    +
    +    private void setThrowable(Throwable t) {
    +      if (throwable == null) {
    +        throwable = t;
    +      } else {
    +        throwable.addSuppressed(t);
    +      }
    +    }
    +
    +    @Override
    +    public V apply(Future<V> future) {
    +      Preconditions.checkState(future.isDone());
    +      if (!future.isCancelled()) {
    +        try {
    +          count++;
    +          return future.get();
    +        } catch (InterruptedException e) {
    +          // there is no wait as we are getting result from the completed/done future
    +          logger.error("Unexpected exception", e);
    +          throw UserException.internalError(e)
    +              .message("Unexpected exception")
    +              .build(logger);
    +        } catch (ExecutionException e) {
    +          setThrowable(e.getCause());
    +        }
    +      } else {
    +        setThrowable(new CancellationException());
    +      }
    +      return null;
    +    }
    +  }
    +
    +  private static class Statistics<V> implements Consumer<TimedCallable<V>> {
    +    final long start = System.nanoTime();
    +    final Stopwatch watch = Stopwatch.createStarted();
    +    long totalExecution = 0;
    +    long maxExecution = 0;
    +    int startedCount = 0;
    +    private int doneCount = 0;
    +    // measure thread creation times
    +    long earliestStart = Long.MAX_VALUE;
    +    long latestStart = 0;
    +    long totalStart = 0;
    +
    +    @Override
    +    public void accept(TimedCallable<V> task) {
    +      long threadStart = task.getStartTime(TimeUnit.NANOSECONDS) - start;
    +      if (threadStart >= 0) {
    +        startedCount++;
    +        earliestStart = Math.min(earliestStart, threadStart);
    +        latestStart = Math.max(latestStart, threadStart);
    +        totalStart += threadStart;
    +        long executionTime = task.getExecutionTime(TimeUnit.NANOSECONDS);
    +        if (executionTime != -1) {
    +          doneCount++;
    +          totalExecution += executionTime;
    +          maxExecution = Math.max(maxExecution, executionTime);
    +        } else {
    +          logger.info("Task {} started at {} did not finish", task, threadStart);
    +        }
    +      } else {
    +        logger.info("Task {} never commenced execution", task);
    +      }
    +    }
    +
    +    void info(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism) {
    +      tasks.forEach(this);
    --- End diff --
    
    Is it better to maintain state that tasks.forEach is collected and print the information in the logger if it is collected instead of tasks.forEach(this) for each invocation of info. I know that at this point of time this is only called once, but in future if a call is introduced this can be avoided. 


---