You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/10/08 20:52:54 UTC

[drill] branch master updated: DRILL-6410: Fixed memory leak in flat Parquet reader

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 55ac523  DRILL-6410: Fixed memory leak in flat Parquet reader
55ac523 is described below

commit 55ac52390877dcaa4a811f570d1eb41aca240bf0
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Mon Oct 8 10:16:44 2018 -0700

    DRILL-6410: Fixed memory leak in flat Parquet reader
---
 .../parquet/columnreaders/AsyncPageReader.java     |   9 +-
 .../exec/util/concurrent/ExecutorServiceUtil.java  | 194 ++++++++++
 .../drill/exec/util/ExecutorServiceUtilTest.java   | 419 +++++++++++++++++++++
 exec/jdbc-all/pom.xml                              |   4 +-
 4 files changed, 620 insertions(+), 6 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 1ffde31..e429fb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
 import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -133,7 +134,7 @@ class AsyncPageReader extends PageReader {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
-      asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+      asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
@@ -230,7 +231,7 @@ class AsyncPageReader extends PageReader {
           //if the queue was full before we took a page out, then there would
           // have been no new read tasks scheduled. In that case, schedule a new read.
           if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
           }
         }
       } finally {
@@ -264,7 +265,7 @@ class AsyncPageReader extends PageReader {
             //if the queue was full before we took a page out, then there would
             // have been no new read tasks scheduled. In that case, schedule a new read.
             if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
             }
           }
           pageHeader = readStatus.getPageHeader();
@@ -456,7 +457,7 @@ class AsyncPageReader extends PageReader {
           // if the queue is not full, schedule another read task immediately. If it is then the consumer
           // will schedule a new read task as soon as it removes a page from the queue.
           if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
-            asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
+            asyncPageRead.offer(ExecutorServiceUtil.submit(parent.threadPool, new AsyncPageReaderTask(debugName, queue)));
           }
         }
         // Do nothing.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java
new file mode 100644
index 0000000..163b6ba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.concurrent;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Utility class to enhance the Java {@link ExecutorService} class functionality */
+public final class ExecutorServiceUtil {
+
+  /**
+   * Helper method to submit the callabale task, gets the original future object, and wrap it
+   * in another future object with the ability to decorate the {@link Future#cancel(boolean)} method;
+   * this decorator will block when future cancellation is invoked (and the "mayInterruptIfRunning"
+   * parameter is set to true).
+   *
+   * @param service the executor service
+   * @param callable a callable task
+   *
+   * @return decorated future object upon successful submission
+   * @see {@link ExecutorService#submit(Callable)
+   */
+  public static <T> Future<T> submit(ExecutorService service, Callable<T> callable) {
+    // Wrap the original callable object
+    CallableTaskWrapper<T> wrapper = new CallableTaskWrapper<T>(callable);
+    // Submit the wrapper object and set the original future object within the wrapper
+    wrapper.setFuture(service.submit(wrapper));
+
+    return wrapper;
+  }
+
+  /** Executor task wrapper to enhance task cancellation behavior */
+  public static final class CallableTaskWrapper<T> implements Callable<T>, Future<T> {
+    /** Callable object */
+    private final Callable<T> callableTask;
+    /** Feature object returned after submission of the callback task */
+    private volatile Future<T> future;
+    /** Captures the callable task execution status */
+    private volatile STATE state = STATE.NOT_RUNNING;
+    /** Monitor object */
+    private final Object monitor = new Object();
+
+    /** Captures task's execution state */
+    private enum STATE {
+      NOT_RUNNING,
+      RUNNING,
+      DONE
+    };
+
+    /**
+     * CTOR.
+     * @param callableTask original callable task
+     */
+    public CallableTaskWrapper(Callable<T> callableTask) {
+      this.callableTask = callableTask;
+      Preconditions.checkNotNull(this.callableTask);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T call() throws Exception {
+      try {
+        state = STATE.RUNNING;
+
+        return callableTask.call();
+      } finally {
+        state = STATE.DONE;
+
+        // Optimization: no need to notify if the state is not "cancelled"
+        if (isCancelled()) {
+          synchronized (monitor) {
+            monitor.notifyAll();
+          }
+        }
+      }
+    }
+
+    /**
+     * This method will block waiting if the callbale thread is still executing and the "mayInterruptIfRunning"
+     * flag is set; this method will return when:
+     * <ul>
+     * <li>The callbale thread is done executing</li>
+     * <li>The current thread got interrupted; no exception will be thrown and instead the interrupted flag will
+     * be set</li>
+     * </ul>
+     *
+     * @see {@link Future#cancel(boolean)}
+     */
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      try {
+
+        return future.cancel(mayInterruptIfRunning);
+      } finally {
+        // If this thread wishes immediate completion of the task and was interrupted (because it was still running),
+        // then block this thread till the callable task is done executing.
+        if (mayInterruptIfRunning) {
+          waitTillDone();
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isCancelled() {
+      // this method could be called from the call() API before the future is set
+      return future != null && future.isCancelled();
+    }
+
+    /**
+     * @return true if the task has completed execution
+     */
+    @Override
+    public boolean isDone() {
+      return state == STATE.DONE;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+      return future.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return future.get(timeout, unit);
+    }
+
+    /**
+     * @param feature the feature to set
+     */
+    public void setFuture(Future<T> feature) {
+      this.future = feature;
+    }
+
+    private void waitTillDone() {
+
+      if (isRunning()) {
+        // Save the current interrupted flag and clear it to allow wait operations
+        boolean interrupted = Thread.interrupted();
+
+        try {
+          synchronized (monitor) {
+            while (isRunning()) {
+              try {
+                monitor.wait();
+              } catch (InterruptedException e) {
+                interrupted = true;
+              }
+            }
+          }
+        } finally {
+          if (interrupted) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
+
+    private boolean isRunning() {
+      return state == STATE.RUNNING;
+    }
+  }
+
+// ----------------------------------------------------------------------------
+// Local implementation
+// ----------------------------------------------------------------------------
+
+  /** Disabling object instantiation */
+  private ExecutorServiceUtil() {
+
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java
new file mode 100644
index 0000000..e2101ad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
+import org.apache.drill.test.DrillTest;
+import org.junit.Test;
+
+/** Tests for validating the Drill executor service utility class */
+public final class ExecutorServiceUtilTest extends DrillTest {
+
+  @Test
+  public void testSuccessfulExecution() throws Exception {
+    final int numThreads = 2;
+    final int numTasks = 20;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+    // Set the test parameters (using the default values)
+    TestParams params = new TestParams();
+
+    // Launch the tasks
+    for (int idx = 0; idx < numTasks; idx++) {
+      CallableTask task = new CallableTask(params);
+      Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+
+      requests.add(new RequestContainer(future, task));
+    }
+
+    int numSuccess  = 0;
+
+    // Wait for the tasks to finish
+    for (int idx = 0; idx < numTasks; idx++) {
+      RequestContainer request = requests.get(idx);
+
+      try {
+        TaskResult result = request.future.get();
+        assertNotNull(result);
+
+        if (result.isSuccess()) {
+          ++numSuccess;
+        }
+      } catch (Exception e) {
+        // NOOP
+      }
+    }
+
+    assertEquals(numTasks, numSuccess);
+  }
+
+  @Test
+  public void testFailedExecution() throws Exception {
+    final int numThreads = 2;
+    final int numTasks = 20;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+    // Set the test parameters
+    TestParams params        = new TestParams();
+    params.generateException = true;
+
+    // Launch the tasks
+    for (int idx = 0; idx < numTasks; idx++) {
+      CallableTask task = new CallableTask(params);
+      Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+
+      requests.add(new RequestContainer(future, task));
+    }
+
+    int numSuccess = 0;
+    int numFailures = 0;
+
+    // Wait for the tasks to finish
+    for (int idx = 0; idx < numTasks; idx++) {
+      RequestContainer request = requests.get(idx);
+
+      try {
+        TaskResult result = request.future.get();
+        assertNotNull(result);
+
+        if (result.isSuccess()) {
+          ++numSuccess;
+        }
+      } catch (Exception e) {
+        assertTrue(request.task.result.isFailed());
+        ++numFailures;
+      }
+    }
+
+    assertEquals(0, numSuccess);
+    assertEquals(numTasks, numFailures);
+  }
+
+  @Test
+  public void testMixedExecution() throws Exception {
+    final int numThreads = 2;
+    final int numTasks = 20;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+    // Set the test parameters
+    TestParams successParams = new TestParams();
+    TestParams failedParams = new TestParams();
+    failedParams.generateException = true;
+
+    int expNumFailedTasks = 0;
+    int expNumSuccessTasks = 0;
+
+    // Launch the tasks
+    for (int idx = 0; idx < numTasks; idx++) {
+      CallableTask task = null;
+
+      if (idx % 2 == 0) {
+        task = new CallableTask(successParams);
+        ++expNumSuccessTasks;
+      } else {
+        task = new CallableTask(failedParams);
+        ++expNumFailedTasks;
+      }
+
+      Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+      requests.add(new RequestContainer(future, task));
+    }
+
+    int numSuccess = 0;
+    int numFailures = 0;
+
+    // Wait for the tasks to finish
+    for (int idx = 0; idx < numTasks; idx++) {
+      RequestContainer request = requests.get(idx);
+
+      try {
+        TaskResult result = request.future.get();
+        assertNotNull(result);
+
+        if (result.isSuccess()) {
+          ++numSuccess;
+        }
+      } catch (Exception e) {
+        assertTrue(request.task.result.isFailed());
+        ++numFailures;
+      }
+    }
+
+    assertEquals(expNumSuccessTasks, numSuccess);
+    assertEquals(expNumFailedTasks, numFailures);
+  }
+
+  @Test
+  public void testCancelExecution() throws Exception {
+    final int numThreads = 2;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    RequestContainer request = null;
+
+    // Set the test parameters
+    TestParams params = new TestParams();
+    params.controller = new TaskExecutionController();
+
+    // Launch the task
+    CallableTask task = new CallableTask(params);
+    Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+    request = new RequestContainer(future, task);
+
+    // Allow the task to start
+    params.controller.start();
+    params.controller.hasStarted();
+
+    // Allow the task to exit but with a delay so that we can test the blocking nature of "cancel"
+    params.controller.delayMillisOnExit = 50;
+    params.controller.exit();
+
+    // Cancel the task
+    boolean result = request.future.cancel(true);
+
+    if (result) {
+      // We were able to cancel this task; let's make sure that it is done now that the current thread is
+      // unblocked
+      assertTrue(task.result.isCancelled());
+
+    } else {
+      // Cancellation could't happen most probably because this thread got context switched for
+      // for a long time (should be rare); let's make sure the task is done and successful
+      assertTrue(task.result.isSuccess());
+    }
+  }
+
+
+
+// ----------------------------------------------------------------------------
+// Internal Classes
+// ----------------------------------------------------------------------------
+
+  @SuppressWarnings("unused")
+  private static final class TaskResult {
+
+    private enum ExecutionStatus {
+      NOT_STARTED,
+      RUNNING,
+      SUCCEEDED,
+      FAILED,
+      CANCELLED
+    };
+
+    private ExecutionStatus status;
+
+    TaskResult() {
+      status = ExecutionStatus.NOT_STARTED;
+    }
+
+    private boolean isSuccess() {
+      return status.equals(ExecutionStatus.SUCCEEDED);
+    }
+
+    private boolean isFailed() {
+      return status.equals(ExecutionStatus.FAILED);
+    }
+
+    private boolean isCancelled() {
+      return status.equals(ExecutionStatus.CANCELLED);
+    }
+
+    private boolean isFailedOrCancelled() {
+      return status.equals(ExecutionStatus.CANCELLED)
+        || status.equals(ExecutionStatus.FAILED);
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static final class TaskExecutionController {
+    private boolean canStart = false;
+    private boolean canExit = false;
+    private boolean started = false;
+    private boolean exited = false;
+    private int delayMillisOnExit = 0;
+    private Object monitor = new Object();
+
+    private void canStart() {
+      synchronized(monitor) {
+        while (!canStart) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+        started = true;
+        monitor.notify();
+      }
+    }
+
+    private void canExit() {
+      synchronized(monitor) {
+        while (!canExit) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+      }
+
+      // Wait requested delay time before exiting
+      for (int i = 0; i < delayMillisOnExit; i++) {
+        try {
+          Thread.sleep(1); // sleep 1 ms
+        } catch (InterruptedException ie) {
+          // NOOP
+        }
+      }
+
+      synchronized(monitor) {
+        exited = true;
+        monitor.notify();
+      }
+    }
+
+    private void start() {
+      synchronized(monitor) {
+        canStart = true;
+        monitor.notify();
+      }
+    }
+
+    private void exit() {
+      synchronized(monitor) {
+        canExit = true;
+        monitor.notify();
+      }
+    }
+
+    private void hasStarted() {
+      synchronized(monitor) {
+        while (!started) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+      }
+    }
+
+    private void hasExited() {
+      synchronized(monitor) {
+        while (!exited) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+      }
+    }
+
+  }
+
+  private static final class TestParams {
+    private int waitTimeMillis = 2;
+    private boolean generateException = false;
+    private TaskExecutionController controller = null;
+  }
+
+  private static final class CallableTask implements Callable<TaskResult> {
+    private volatile TaskResult result = new TaskResult();
+    private final TestParams params;
+
+    private CallableTask(TestParams params) {
+      this.params = params;
+    }
+
+    @Override
+    public TaskResult call() throws Exception {
+
+      beforeStart();
+
+      result.status = TaskResult.ExecutionStatus.RUNNING;
+      boolean interrupted = false;
+      Exception exc = null;
+
+      try {
+        for (int i = 0; i < params.waitTimeMillis; i++) {
+          try {
+            Thread.sleep(1); // sleep 1 ms
+          } catch (InterruptedException ie) {
+            interrupted = true;
+          }
+        }
+
+        if (params.generateException) {
+          throw new RuntimeException("Test emulated exception..");
+        }
+
+      } catch (Exception e) {
+        exc = e;
+        throw e;
+
+      } finally {
+        beforeExit();
+
+        if (interrupted) {
+          result.status = TaskResult.ExecutionStatus.CANCELLED;
+        } else if (exc != null) {
+          result.status = TaskResult.ExecutionStatus.FAILED;
+        } else {
+          result.status = TaskResult.ExecutionStatus.SUCCEEDED;
+        }
+      }
+      return result;
+    }
+
+    private void beforeStart() {
+      if (params.controller != null) {
+        params.controller.canStart();
+      }
+    }
+
+    private void beforeExit() {
+      if (params.controller != null) {
+        params.controller.canExit();
+      }
+    }
+  }
+
+  private static final class RequestContainer {
+    private final Future<TaskResult> future;
+    private final CallableTask task;
+
+    private RequestContainer(Future<TaskResult> future, CallableTask task) {
+      this.future = future;
+      this.task   = task;
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index cdd5dc9..7be43ab 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -510,7 +510,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                   </message>
-                  <maxsize>39000000</maxsize>
+                  <maxsize>39500000</maxsize>
                   <minsize>15000000</minsize>
                   <files>
                    <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -570,7 +570,7 @@
                           This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                         </message>
-                        <maxsize>37000000</maxsize>
+                        <maxsize>37500000</maxsize>
                         <minsize>15000000</minsize>
                         <files>
                           <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>