You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/10/08 20:52:54 UTC
[drill] branch master updated: DRILL-6410: Fixed memory leak in
flat Parquet reader
This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 55ac523 DRILL-6410: Fixed memory leak in flat Parquet reader
55ac523 is described below
commit 55ac52390877dcaa4a811f570d1eb41aca240bf0
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Mon Oct 8 10:16:44 2018 -0700
DRILL-6410: Fixed memory leak in flat Parquet reader
---
.../parquet/columnreaders/AsyncPageReader.java | 9 +-
.../exec/util/concurrent/ExecutorServiceUtil.java | 194 ++++++++++
.../drill/exec/util/ExecutorServiceUtilTest.java | 419 +++++++++++++++++++++
exec/jdbc-all/pom.xml | 4 +-
4 files changed, 620 insertions(+), 6 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 1ffde31..e429fb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
import org.apache.drill.exec.util.filereader.DirectBufInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -133,7 +134,7 @@ class AsyncPageReader extends PageReader {
super.init();
//Avoid Init if a shutdown is already in progress even if init() is called once
if (!parentColumnReader.isShuttingDown) {
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
}
}
@@ -230,7 +231,7 @@ class AsyncPageReader extends PageReader {
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
if (!parentColumnReader.isShuttingDown && pageQueueFull) {
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
}
}
} finally {
@@ -264,7 +265,7 @@ class AsyncPageReader extends PageReader {
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
if (!parentColumnReader.isShuttingDown && pageQueueFull) {
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
}
}
pageHeader = readStatus.getPageHeader();
@@ -456,7 +457,7 @@ class AsyncPageReader extends PageReader {
// if the queue is not full, schedule another read task immediately. If it is then the consumer
// will schedule a new read task as soon as it removes a page from the queue.
if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
- asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
+ asyncPageRead.offer(ExecutorServiceUtil.submit(parent.threadPool, new AsyncPageReaderTask(debugName, queue)));
}
}
// Do nothing.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java
new file mode 100644
index 0000000..163b6ba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/concurrent/ExecutorServiceUtil.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.concurrent;
+
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Utility class to enhance the Java {@link ExecutorService} class functionality */
+public final class ExecutorServiceUtil {
+
+ /**
+ * Helper method to submit the callabale task, gets the original future object, and wrap it
+ * in another future object with the ability to decorate the {@link Future#cancel(boolean)} method;
+ * this decorator will block when future cancellation is invoked (and the "mayInterruptIfRunning"
+ * parameter is set to true).
+ *
+ * @param service the executor service
+ * @param callable a callable task
+ *
+ * @return decorated future object upon successful submission
+ * @see {@link ExecutorService#submit(Callable)
+ */
+ public static <T> Future<T> submit(ExecutorService service, Callable<T> callable) {
+ // Wrap the original callable object
+ CallableTaskWrapper<T> wrapper = new CallableTaskWrapper<T>(callable);
+ // Submit the wrapper object and set the original future object within the wrapper
+ wrapper.setFuture(service.submit(wrapper));
+
+ return wrapper;
+ }
+
+ /** Executor task wrapper to enhance task cancellation behavior */
+ public static final class CallableTaskWrapper<T> implements Callable<T>, Future<T> {
+ /** Callable object */
+ private final Callable<T> callableTask;
+ /** Feature object returned after submission of the callback task */
+ private volatile Future<T> future;
+ /** Captures the callable task execution status */
+ private volatile STATE state = STATE.NOT_RUNNING;
+ /** Monitor object */
+ private final Object monitor = new Object();
+
+ /** Captures task's execution state */
+ private enum STATE {
+ NOT_RUNNING,
+ RUNNING,
+ DONE
+ };
+
+ /**
+ * CTOR.
+ * @param callableTask original callable task
+ */
+ public CallableTaskWrapper(Callable<T> callableTask) {
+ this.callableTask = callableTask;
+ Preconditions.checkNotNull(this.callableTask);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T call() throws Exception {
+ try {
+ state = STATE.RUNNING;
+
+ return callableTask.call();
+ } finally {
+ state = STATE.DONE;
+
+ // Optimization: no need to notify if the state is not "cancelled"
+ if (isCancelled()) {
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+ }
+ }
+
+ /**
+ * This method will block waiting if the callbale thread is still executing and the "mayInterruptIfRunning"
+ * flag is set; this method will return when:
+ * <ul>
+ * <li>The callbale thread is done executing</li>
+ * <li>The current thread got interrupted; no exception will be thrown and instead the interrupted flag will
+ * be set</li>
+ * </ul>
+ *
+ * @see {@link Future#cancel(boolean)}
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ try {
+
+ return future.cancel(mayInterruptIfRunning);
+ } finally {
+ // If this thread wishes immediate completion of the task and was interrupted (because it was still running),
+ // then block this thread till the callable task is done executing.
+ if (mayInterruptIfRunning) {
+ waitTillDone();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean isCancelled() {
+ // this method could be called from the call() API before the future is set
+ return future != null && future.isCancelled();
+ }
+
+ /**
+ * @return true if the task has completed execution
+ */
+ @Override
+ public boolean isDone() {
+ return state == STATE.DONE;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return future.get(timeout, unit);
+ }
+
+ /**
+ * @param feature the feature to set
+ */
+ public void setFuture(Future<T> feature) {
+ this.future = feature;
+ }
+
+ private void waitTillDone() {
+
+ if (isRunning()) {
+ // Save the current interrupted flag and clear it to allow wait operations
+ boolean interrupted = Thread.interrupted();
+
+ try {
+ synchronized (monitor) {
+ while (isRunning()) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ private boolean isRunning() {
+ return state == STATE.RUNNING;
+ }
+ }
+
+// ----------------------------------------------------------------------------
+// Local implementation
+// ----------------------------------------------------------------------------
+
+ /** Disabling object instantiation */
+ private ExecutorServiceUtil() {
+
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java
new file mode 100644
index 0000000..e2101ad
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/ExecutorServiceUtilTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
+import org.apache.drill.test.DrillTest;
+import org.junit.Test;
+
+/** Tests for validating the Drill executor service utility class */
+public final class ExecutorServiceUtilTest extends DrillTest {
+
+ @Test
+ public void testSuccessfulExecution() throws Exception {
+ final int numThreads = 2;
+ final int numTasks = 20;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+ // Set the test parameters (using the default values)
+ TestParams params = new TestParams();
+
+ // Launch the tasks
+ for (int idx = 0; idx < numTasks; idx++) {
+ CallableTask task = new CallableTask(params);
+ Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+
+ requests.add(new RequestContainer(future, task));
+ }
+
+ int numSuccess = 0;
+
+ // Wait for the tasks to finish
+ for (int idx = 0; idx < numTasks; idx++) {
+ RequestContainer request = requests.get(idx);
+
+ try {
+ TaskResult result = request.future.get();
+ assertNotNull(result);
+
+ if (result.isSuccess()) {
+ ++numSuccess;
+ }
+ } catch (Exception e) {
+ // NOOP
+ }
+ }
+
+ assertEquals(numTasks, numSuccess);
+ }
+
+ @Test
+ public void testFailedExecution() throws Exception {
+ final int numThreads = 2;
+ final int numTasks = 20;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+ // Set the test parameters
+ TestParams params = new TestParams();
+ params.generateException = true;
+
+ // Launch the tasks
+ for (int idx = 0; idx < numTasks; idx++) {
+ CallableTask task = new CallableTask(params);
+ Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+
+ requests.add(new RequestContainer(future, task));
+ }
+
+ int numSuccess = 0;
+ int numFailures = 0;
+
+ // Wait for the tasks to finish
+ for (int idx = 0; idx < numTasks; idx++) {
+ RequestContainer request = requests.get(idx);
+
+ try {
+ TaskResult result = request.future.get();
+ assertNotNull(result);
+
+ if (result.isSuccess()) {
+ ++numSuccess;
+ }
+ } catch (Exception e) {
+ assertTrue(request.task.result.isFailed());
+ ++numFailures;
+ }
+ }
+
+ assertEquals(0, numSuccess);
+ assertEquals(numTasks, numFailures);
+ }
+
+ @Test
+ public void testMixedExecution() throws Exception {
+ final int numThreads = 2;
+ final int numTasks = 20;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ List<RequestContainer> requests = new ArrayList<>(numTasks);
+
+ // Set the test parameters
+ TestParams successParams = new TestParams();
+ TestParams failedParams = new TestParams();
+ failedParams.generateException = true;
+
+ int expNumFailedTasks = 0;
+ int expNumSuccessTasks = 0;
+
+ // Launch the tasks
+ for (int idx = 0; idx < numTasks; idx++) {
+ CallableTask task = null;
+
+ if (idx % 2 == 0) {
+ task = new CallableTask(successParams);
+ ++expNumSuccessTasks;
+ } else {
+ task = new CallableTask(failedParams);
+ ++expNumFailedTasks;
+ }
+
+ Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+ requests.add(new RequestContainer(future, task));
+ }
+
+ int numSuccess = 0;
+ int numFailures = 0;
+
+ // Wait for the tasks to finish
+ for (int idx = 0; idx < numTasks; idx++) {
+ RequestContainer request = requests.get(idx);
+
+ try {
+ TaskResult result = request.future.get();
+ assertNotNull(result);
+
+ if (result.isSuccess()) {
+ ++numSuccess;
+ }
+ } catch (Exception e) {
+ assertTrue(request.task.result.isFailed());
+ ++numFailures;
+ }
+ }
+
+ assertEquals(expNumSuccessTasks, numSuccess);
+ assertEquals(expNumFailedTasks, numFailures);
+ }
+
+ @Test
+ public void testCancelExecution() throws Exception {
+ final int numThreads = 2;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ RequestContainer request = null;
+
+ // Set the test parameters
+ TestParams params = new TestParams();
+ params.controller = new TaskExecutionController();
+
+ // Launch the task
+ CallableTask task = new CallableTask(params);
+ Future<TaskResult> future = ExecutorServiceUtil.submit(service, task);
+ request = new RequestContainer(future, task);
+
+ // Allow the task to start
+ params.controller.start();
+ params.controller.hasStarted();
+
+ // Allow the task to exit but with a delay so that we can test the blocking nature of "cancel"
+ params.controller.delayMillisOnExit = 50;
+ params.controller.exit();
+
+ // Cancel the task
+ boolean result = request.future.cancel(true);
+
+ if (result) {
+ // We were able to cancel this task; let's make sure that it is done now that the current thread is
+ // unblocked
+ assertTrue(task.result.isCancelled());
+
+ } else {
+ // Cancellation could't happen most probably because this thread got context switched for
+ // for a long time (should be rare); let's make sure the task is done and successful
+ assertTrue(task.result.isSuccess());
+ }
+ }
+
+
+
+// ----------------------------------------------------------------------------
+// Internal Classes
+// ----------------------------------------------------------------------------
+
+ @SuppressWarnings("unused")
+ private static final class TaskResult {
+
+ private enum ExecutionStatus {
+ NOT_STARTED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ CANCELLED
+ };
+
+ private ExecutionStatus status;
+
+ TaskResult() {
+ status = ExecutionStatus.NOT_STARTED;
+ }
+
+ private boolean isSuccess() {
+ return status.equals(ExecutionStatus.SUCCEEDED);
+ }
+
+ private boolean isFailed() {
+ return status.equals(ExecutionStatus.FAILED);
+ }
+
+ private boolean isCancelled() {
+ return status.equals(ExecutionStatus.CANCELLED);
+ }
+
+ private boolean isFailedOrCancelled() {
+ return status.equals(ExecutionStatus.CANCELLED)
+ || status.equals(ExecutionStatus.FAILED);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static final class TaskExecutionController {
+ private boolean canStart = false;
+ private boolean canExit = false;
+ private boolean started = false;
+ private boolean exited = false;
+ private int delayMillisOnExit = 0;
+ private Object monitor = new Object();
+
+ private void canStart() {
+ synchronized(monitor) {
+ while (!canStart) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException ie) {
+ // NOOP
+ }
+ }
+ started = true;
+ monitor.notify();
+ }
+ }
+
+ private void canExit() {
+ synchronized(monitor) {
+ while (!canExit) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException ie) {
+ // NOOP
+ }
+ }
+ }
+
+ // Wait requested delay time before exiting
+ for (int i = 0; i < delayMillisOnExit; i++) {
+ try {
+ Thread.sleep(1); // sleep 1 ms
+ } catch (InterruptedException ie) {
+ // NOOP
+ }
+ }
+
+ synchronized(monitor) {
+ exited = true;
+ monitor.notify();
+ }
+ }
+
+ private void start() {
+ synchronized(monitor) {
+ canStart = true;
+ monitor.notify();
+ }
+ }
+
+ private void exit() {
+ synchronized(monitor) {
+ canExit = true;
+ monitor.notify();
+ }
+ }
+
+ private void hasStarted() {
+ synchronized(monitor) {
+ while (!started) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException ie) {
+ // NOOP
+ }
+ }
+ }
+ }
+
+ private void hasExited() {
+ synchronized(monitor) {
+ while (!exited) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException ie) {
+ // NOOP
+ }
+ }
+ }
+ }
+
+ }
+
+ private static final class TestParams {
+ private int waitTimeMillis = 2;
+ private boolean generateException = false;
+ private TaskExecutionController controller = null;
+ }
+
+ private static final class CallableTask implements Callable<TaskResult> {
+ private volatile TaskResult result = new TaskResult();
+ private final TestParams params;
+
+ private CallableTask(TestParams params) {
+ this.params = params;
+ }
+
+ @Override
+ public TaskResult call() throws Exception {
+
+ beforeStart();
+
+ result.status = TaskResult.ExecutionStatus.RUNNING;
+ boolean interrupted = false;
+ Exception exc = null;
+
+ try {
+ for (int i = 0; i < params.waitTimeMillis; i++) {
+ try {
+ Thread.sleep(1); // sleep 1 ms
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ }
+
+ if (params.generateException) {
+ throw new RuntimeException("Test emulated exception..");
+ }
+
+ } catch (Exception e) {
+ exc = e;
+ throw e;
+
+ } finally {
+ beforeExit();
+
+ if (interrupted) {
+ result.status = TaskResult.ExecutionStatus.CANCELLED;
+ } else if (exc != null) {
+ result.status = TaskResult.ExecutionStatus.FAILED;
+ } else {
+ result.status = TaskResult.ExecutionStatus.SUCCEEDED;
+ }
+ }
+ return result;
+ }
+
+ private void beforeStart() {
+ if (params.controller != null) {
+ params.controller.canStart();
+ }
+ }
+
+ private void beforeExit() {
+ if (params.controller != null) {
+ params.controller.canExit();
+ }
+ }
+ }
+
+ private static final class RequestContainer {
+ private final Future<TaskResult> future;
+ private final CallableTask task;
+
+ private RequestContainer(Future<TaskResult> future, CallableTask task) {
+ this.future = future;
+ this.task = task;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index cdd5dc9..7be43ab 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -510,7 +510,7 @@
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>39000000</maxsize>
+ <maxsize>39500000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -570,7 +570,7 @@
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>37000000</maxsize>
+ <maxsize>37500000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>