You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/10/08 20:52:50 UTC

[GitHub] ilooner closed pull request #1497: DRILL-6410: Fixed memory leak in flat Parquet reader

ilooner closed pull request #1497: DRILL-6410: Fixed memory leak in flat Parquet reader
URL: https://github.com/apache/drill/pull/1497
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 1ffde31630a..e429fb63a7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -33,6 +33,7 @@
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
 import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -133,7 +134,7 @@ protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
-      asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+      asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
@@ -230,7 +231,7 @@ protected void nextInternal() throws IOException {
           //if the queue was full before we took a page out, then there would
           // have been no new read tasks scheduled. In that case, schedule a new read.
           if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
           }
         }
       } finally {
@@ -264,7 +265,7 @@ protected void nextInternal() throws IOException {
             //if the queue was full before we took a page out, then there would
             // have been no new read tasks scheduled. In that case, schedule a new read.
             if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
             }
           }
           pageHeader = readStatus.getPageHeader();
@@ -456,7 +457,7 @@ public Void call() throws IOException {
           // if the queue is not full, schedule another read task immediately. If it is then the consumer
           // will schedule a new read task as soon as it removes a page from the queue.
           if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
-            asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
+            asyncPageRead.offer(ExecutorServiceUtil.submit(parent.threadPool, new AsyncPageReaderTask(debugName, queue)));
           }
         }
         // Do nothing.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java
new file mode 100644
index 00000000000..163b6baf475
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.concurrent;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Utility class to enhance the Java {@link ExecutorService} class functionality */
+public final class ExecutorServiceUtil {
+
+  /**
+   * Helper method to submit the callabale task, gets the original future object, and wrap it
+   * in another future object with the ability to decorate the {@link Future#cancel(boolean)} method;
+   * this decorator will block when future cancellation is invoked (and the "mayInterruptIfRunning"
+   * parameter is set to true).
+   *
+   * @param service the executor service
+   * @param callable a callable task
+   *
+   * @return decorated future object upon successful submission
+   * @see {@link ExecutorService#submit(Callable)
+   */
+  public static <T> Future<T> submit(ExecutorService service, Callable<T> callable) {
+    // Wrap the original callable object
+    CallableTaskWrapper<T> wrapper = new CallableTaskWrapper<T>(callable);
+    // Submit the wrapper object and set the original future object within the wrapper
+    wrapper.setFuture(service.submit(wrapper));
+
+    return wrapper;
+  }
+
+  /** Executor task wrapper to enhance task cancellation behavior */
+  public static final class CallableTaskWrapper<T> implements Callable<T>, Future<T> {
+    /** Callable object */
+    private final Callable<T> callableTask;
+    /** Feature object returned after submission of the callback task */
+    private volatile Future<T> future;
+    /** Captures the callable task execution status */
+    private volatile STATE state = STATE.NOT_RUNNING;
+    /** Monitor object */
+    private final Object monitor = new Object();
+
+    /** Captures task's execution state */
+    private enum STATE {
+      NOT_RUNNING,
+      RUNNING,
+      DONE
+    };
+
+    /**
+     * CTOR.
+     * @param callableTask original callable task
+     */
+    public CallableTaskWrapper(Callable<T> callableTask) {
+      this.callableTask = callableTask;
+      Preconditions.checkNotNull(this.callableTask);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T call() throws Exception {
+      try {
+        state = STATE.RUNNING;
+
+        return callableTask.call();
+      } finally {
+        state = STATE.DONE;
+
+        // Optimization: no need to notify if the state is not "cancelled"
+        if (isCancelled()) {
+          synchronized (monitor) {
+            monitor.notifyAll();
+          }
+        }
+      }
+    }
+
+    /**
+     * This method will block waiting if the callbale thread is still executing and the "mayInterruptIfRunning"
+     * flag is set; this method will return when:
+     * <ul>
+     * <li>The callbale thread is done executing</li>
+     * <li>The current thread got interrupted; no exception will be thrown and instead the interrupted flag will
+     * be set</li>
+     * </ul>
+     *
+     * @see {@link Future#cancel(boolean)}
+     */
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      try {
+
+        return future.cancel(mayInterruptIfRunning);
+      } finally {
+        // If this thread wishes immediate completion of the task and was interrupted (because it was still running),
+        // then block this thread till the callable task is done executing.
+        if (mayInterruptIfRunning) {
+          waitTillDone();
+        }
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isCancelled() {
+      // this method could be called from the call() API before the future is set
+      return future != null && future.isCancelled();
+    }
+
+    /**
+     * @return true if the task has completed execution
+     */
+    @Override
+    public boolean isDone() {
+      return state == STATE.DONE;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T get() throws InterruptedException, ExecutionException {
+      return future.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return future.get(timeout, unit);
+    }
+
+    /**
+     * @param feature the feature to set
+     */
+    public void setFuture(Future<T> feature) {
+      this.future = feature;
+    }
+
+    private void waitTillDone() {
+
+      if (isRunning()) {
+        // Save the current interrupted flag and clear it to allow wait operations
+        boolean interrupted = Thread.interrupted();
+
+        try {
+          synchronized (monitor) {
+            while (isRunning()) {
+              try {
+                monitor.wait();
+              } catch (InterruptedException e) {
+                interrupted = true;
+              }
+            }
+          }
+        } finally {
+          if (interrupted) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
+
+    private boolean isRunning() {
+      return state == STATE.RUNNING;
+    }
+  }
+
+// ----------------------------------------------------------------------------
+// Local implementation
+// ----------------------------------------------------------------------------
+
+  /** Disabling object instantiation */
+  private ExecutorServiceUtil() {
+
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java
new file mode 100644
index 00000000000..e2101adafef
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
+import org.apache.drill.test.DrillTest;
+import org.junit.Test;
+
+/** Tests for validating the Drill executor service utility class */
+public final class ExecutorServiceUtilTest extends DrillTest {
+
+  @Test
+  public void testSuccessfulExecution() throws Exception {
+    final int numThreads = 2;
+    final int numTasks = 20;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+    // Set the test parameters (using the default values)
+    TestParams params = new TestParams();
+
+    // Launch the tasks
+    for (int idx = 0; idx < numTasks; idx++) {
+      CallableTask task = new CallableTask(params);
+      Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+
+      requests.add(new RequestContainer(future, task));
+    }
+
+    int numSuccess  = 0;
+
+    // Wait for the tasks to finish
+    for (int idx = 0; idx < numTasks; idx++) {
+      RequestContainer request = requests.get(idx);
+
+      try {
+        TaskResult result = request.future.get();
+        assertNotNull(result);
+
+        if (result.isSuccess()) {
+          ++numSuccess;
+        }
+      } catch (Exception e) {
+        // NOOP
+      }
+    }
+
+    assertEquals(numTasks, numSuccess);
+  }
+
+  @Test
+  public void testFailedExecution() throws Exception {
+    final int numThreads = 2;
+    final int numTasks = 20;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+    // Set the test parameters
+    TestParams params        = new TestParams();
+    params.generateException = true;
+
+    // Launch the tasks
+    for (int idx = 0; idx < numTasks; idx++) {
+      CallableTask task = new CallableTask(params);
+      Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+
+      requests.add(new RequestContainer(future, task));
+    }
+
+    int numSuccess = 0;
+    int numFailures = 0;
+
+    // Wait for the tasks to finish
+    for (int idx = 0; idx < numTasks; idx++) {
+      RequestContainer request = requests.get(idx);
+
+      try {
+        TaskResult result = request.future.get();
+        assertNotNull(result);
+
+        if (result.isSuccess()) {
+          ++numSuccess;
+        }
+      } catch (Exception e) {
+        assertTrue(request.task.result.isFailed());
+        ++numFailures;
+      }
+    }
+
+    assertEquals(0, numSuccess);
+    assertEquals(numTasks, numFailures);
+  }
+
+  @Test
+  public void testMixedExecution() throws Exception {
+    final int numThreads = 2;
+    final int numTasks = 20;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+    // Set the test parameters
+    TestParams successParams = new TestParams();
+    TestParams failedParams = new TestParams();
+    failedParams.generateException = true;
+
+    int expNumFailedTasks = 0;
+    int expNumSuccessTasks = 0;
+
+    // Launch the tasks
+    for (int idx = 0; idx < numTasks; idx++) {
+      CallableTask task = null;
+
+      if (idx % 2 == 0) {
+        task = new CallableTask(successParams);
+        ++expNumSuccessTasks;
+      } else {
+        task = new CallableTask(failedParams);
+        ++expNumFailedTasks;
+      }
+
+      Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+      requests.add(new RequestContainer(future, task));
+    }
+
+    int numSuccess = 0;
+    int numFailures = 0;
+
+    // Wait for the tasks to finish
+    for (int idx = 0; idx < numTasks; idx++) {
+      RequestContainer request = requests.get(idx);
+
+      try {
+        TaskResult result = request.future.get();
+        assertNotNull(result);
+
+        if (result.isSuccess()) {
+          ++numSuccess;
+        }
+      } catch (Exception e) {
+        assertTrue(request.task.result.isFailed());
+        ++numFailures;
+      }
+    }
+
+    assertEquals(expNumSuccessTasks, numSuccess);
+    assertEquals(expNumFailedTasks, numFailures);
+  }
+
+  @Test
+  public void testCancelExecution() throws Exception {
+    final int numThreads = 2;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    RequestContainer request = null;
+
+    // Set the test parameters
+    TestParams params = new TestParams();
+    params.controller = new TaskExecutionController();
+
+    // Launch the task
+    CallableTask task = new CallableTask(params);
+    Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+    request = new RequestContainer(future, task);
+
+    // Allow the task to start
+    params.controller.start();
+    params.controller.hasStarted();
+
+    // Allow the task to exit but with a delay so that we can test the blocking nature of "cancel"
+    params.controller.delayMillisOnExit = 50;
+    params.controller.exit();
+
+    // Cancel the task
+    boolean result = request.future.cancel(true);
+
+    if (result) {
+      // We were able to cancel this task; let's make sure that it is done now that the current thread is
+      // unblocked
+      assertTrue(task.result.isCancelled());
+
+    } else {
+      // Cancellation could't happen most probably because this thread got context switched for
+      // for a long time (should be rare); let's make sure the task is done and successful
+      assertTrue(task.result.isSuccess());
+    }
+  }
+
+
+
+// ----------------------------------------------------------------------------
+// Internal Classes
+// ----------------------------------------------------------------------------
+
+  @SuppressWarnings("unused")
+  private static final class TaskResult {
+
+    private enum ExecutionStatus {
+      NOT_STARTED,
+      RUNNING,
+      SUCCEEDED,
+      FAILED,
+      CANCELLED
+    };
+
+    private ExecutionStatus status;
+
+    TaskResult() {
+      status = ExecutionStatus.NOT_STARTED;
+    }
+
+    private boolean isSuccess() {
+      return status.equals(ExecutionStatus.SUCCEEDED);
+    }
+
+    private boolean isFailed() {
+      return status.equals(ExecutionStatus.FAILED);
+    }
+
+    private boolean isCancelled() {
+      return status.equals(ExecutionStatus.CANCELLED);
+    }
+
+    private boolean isFailedOrCancelled() {
+      return status.equals(ExecutionStatus.CANCELLED)
+        || status.equals(ExecutionStatus.FAILED);
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private static final class TaskExecutionController {
+    private boolean canStart = false;
+    private boolean canExit = false;
+    private boolean started = false;
+    private boolean exited = false;
+    private int delayMillisOnExit = 0;
+    private Object monitor = new Object();
+
+    private void canStart() {
+      synchronized(monitor) {
+        while (!canStart) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+        started = true;
+        monitor.notify();
+      }
+    }
+
+    private void canExit() {
+      synchronized(monitor) {
+        while (!canExit) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+      }
+
+      // Wait requested delay time before exiting
+      for (int i = 0; i < delayMillisOnExit; i++) {
+        try {
+          Thread.sleep(1); // sleep 1 ms
+        } catch (InterruptedException ie) {
+          // NOOP
+        }
+      }
+
+      synchronized(monitor) {
+        exited = true;
+        monitor.notify();
+      }
+    }
+
+    private void start() {
+      synchronized(monitor) {
+        canStart = true;
+        monitor.notify();
+      }
+    }
+
+    private void exit() {
+      synchronized(monitor) {
+        canExit = true;
+        monitor.notify();
+      }
+    }
+
+    private void hasStarted() {
+      synchronized(monitor) {
+        while (!started) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+      }
+    }
+
+    private void hasExited() {
+      synchronized(monitor) {
+        while (!exited) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ie) {
+            // NOOP
+          }
+        }
+      }
+    }
+
+  }
+
+  private static final class TestParams {
+    private int waitTimeMillis = 2;
+    private boolean generateException = false;
+    private TaskExecutionController controller = null;
+  }
+
+  private static final class CallableTask implements Callable<TaskResult> {
+    private volatile TaskResult result = new TaskResult();
+    private final TestParams params;
+
+    private CallableTask(TestParams params) {
+      this.params = params;
+    }
+
+    @Override
+    public TaskResult call() throws Exception {
+
+      beforeStart();
+
+      result.status = TaskResult.ExecutionStatus.RUNNING;
+      boolean interrupted = false;
+      Exception exc = null;
+
+      try {
+        for (int i = 0; i < params.waitTimeMillis; i++) {
+          try {
+            Thread.sleep(1); // sleep 1 ms
+          } catch (InterruptedException ie) {
+            interrupted = true;
+          }
+        }
+
+        if (params.generateException) {
+          throw new RuntimeException("Test emulated exception..");
+        }
+
+      } catch (Exception e) {
+        exc = e;
+        throw e;
+
+      } finally {
+        beforeExit();
+
+        if (interrupted) {
+          result.status = TaskResult.ExecutionStatus.CANCELLED;
+        } else if (exc != null) {
+          result.status = TaskResult.ExecutionStatus.FAILED;
+        } else {
+          result.status = TaskResult.ExecutionStatus.SUCCEEDED;
+        }
+      }
+      return result;
+    }
+
+    private void beforeStart() {
+      if (params.controller != null) {
+        params.controller.canStart();
+      }
+    }
+
+    private void beforeExit() {
+      if (params.controller != null) {
+        params.controller.canExit();
+      }
+    }
+  }
+
+  private static final class RequestContainer {
+    private final Future<TaskResult> future;
+    private final CallableTask task;
+
+    private RequestContainer(Future<TaskResult> future, CallableTask task) {
+      this.future = future;
+      this.task   = task;
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 7a6c9eeb0f8..98d76fc9c22 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -511,7 +511,7 @@
                   This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                   </message>
-                  <maxsize>39000000</maxsize>
+                  <maxsize>39500000</maxsize>
                   <minsize>15000000</minsize>
                   <files>
                    <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -571,7 +571,7 @@
                           This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
 
                         </message>
-                        <maxsize>37000000</maxsize>
+                        <maxsize>37500000</maxsize>
                         <minsize>15000000</minsize>
                         <files>
                           <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services