You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/05/19 23:16:58 UTC
reef git commit: [REEF-1393] Consider active threads when checking
applicationDispatcher.isEmpty
Repository: reef
Updated Branches:
refs/heads/master 326eae21a -> 2b032719f
[REEF-1393] Consider active threads when checking applicationDispatcher.isEmpty
This addressed the issue by
* Adding active threads to consideration in applicationDispatcher.isEmpty.
* Adding a ThreadPool to check for the exit of Evaluator handlers and to
trigger a check for the idleness of the Evaluator.
JIRA:
[REEF-1393](https://issues.apache.org/jira/browse/REEF-1393)
Pull Request:
This closes #1007
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/2b032719
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/2b032719
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/2b032719
Branch: refs/heads/master
Commit: 2b032719f0c28fafd83d225b60b1f9ff89e9c6ec
Parents: 326eae2
Author: Andrew Chung <af...@gmail.com>
Authored: Wed May 18 11:38:42 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu May 19 16:15:03 2016 -0700
----------------------------------------------------------------------
.../apache/reef/client/DriverConfiguration.java | 18 +++++
.../runtime/common/driver/DriverSingletons.java | 5 +-
.../evaluator/EvaluatorIdlenessThreadPool.java | 80 ++++++++++++++++++++
.../driver/evaluator/EvaluatorManager.java | 19 +++--
.../EvaluatorIdlenessThreadPoolSize.java | 33 ++++++++
.../EvaluatorIdlenessWaitInMilliseconds.java | 33 ++++++++
.../runtime/common/utils/DispatchingEStage.java | 2 +-
.../apache/reef/wake/impl/ThreadPoolStage.java | 45 +++++++----
8 files changed, 211 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
index a566475..6edb52f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverConfiguration.java
@@ -32,6 +32,8 @@ import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.task.*;
import org.apache.reef.runtime.common.driver.DriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessThreadPoolSize;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessWaitInMilliseconds;
import org.apache.reef.tang.formats.*;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.Clock;
@@ -177,6 +179,8 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
*/
public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
+ // ***** MISC
+
/**
* Progress provider. See {@link ProgressProvider}.
*/
@@ -193,6 +197,18 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
public static final OptionalParameter<Integer> MAX_APPLICATION_SUBMISSIONS = new OptionalParameter<>();
/**
+ * The number of Threads in a Driver to verify the completion of Evaluators.
+ * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+ */
+ public static final OptionalParameter<Integer> EVALUATOR_IDLENESS_THREAD_POOL_SIZE = new OptionalParameter<>();
+
+ /**
+ * The number of Threads in a Driver to verify the completion of Evaluators.
+ * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+ */
+ public static final OptionalParameter<Long> EVALUATOR_IDLENESS_WAIT_IN_MS = new OptionalParameter<>();
+
+ /**
* ConfigurationModule to fill out to get a legal Driver Configuration.
*/
public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
@@ -235,6 +251,8 @@ public final class DriverConfiguration extends ConfigurationModuleBuilder {
// Various parameters
.bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS)
+ .bindNamedParameter(EvaluatorIdlenessThreadPoolSize.class, EVALUATOR_IDLENESS_THREAD_POOL_SIZE)
+ .bindNamedParameter(EvaluatorIdlenessWaitInMilliseconds.class, EVALUATOR_IDLENESS_WAIT_IN_MS)
.bindImplementation(ProgressProvider.class, PROGRESS_PROVIDER)
.build();
}
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
index 7b21f9a..8c31e05 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverSingletons.java
@@ -32,6 +32,7 @@ import org.apache.reef.driver.task.*;
import org.apache.reef.proto.ClientRuntimeProtocol;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EventHandler;
@@ -98,6 +99,8 @@ final class DriverSingletons {
// we get container failures dure to modifications
// to already submitted global jar file
final ResourceLaunchHandler resourceLaunchHandler,
- final ResourceReleaseHandler resourceReleaseHandler) {
+ final ResourceReleaseHandler resourceReleaseHandler,
+
+ final EvaluatorIdlenessThreadPool evaluatorIdlenessThreadPool) {
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
new file mode 100644
index 0000000..e94ab4c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorIdlenessThreadPool.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessThreadPoolSize;
+import org.apache.reef.runtime.common.driver.parameters.EvaluatorIdlenessWaitInMilliseconds;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.impl.DefaultThreadFactory;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Runs threads in a thread pool to check the completion of Evaluators on the closing
+ * of an {@link EvaluatorManager} in order to trigger Evaluator idleness checks.
+ */
+@Private
+public final class EvaluatorIdlenessThreadPool {
+ private static final Logger LOG = Logger.getLogger(EvaluatorIdlenessThreadPool.class.getName());
+
+ private final ExecutorService executor;
+ private final long waitInMillis;
+
+ @Inject
+ private EvaluatorIdlenessThreadPool(@Parameter(EvaluatorIdlenessThreadPoolSize.class) final int numThreads,
+ @Parameter(EvaluatorIdlenessWaitInMilliseconds.class) final long waitInMillis) {
+
+ Validate.isTrue(waitInMillis >= 0, "EvaluatorIdlenessWaitInMilliseconds must be configured to be >= 0");
+ Validate.isTrue(numThreads > 0, "EvaluatorIdlenessThreadPoolSize must be configured to be > 0");
+
+ this.waitInMillis = waitInMillis;
+ this.executor = Executors.newFixedThreadPool(
+ numThreads, new DefaultThreadFactory(EvaluatorIdlenessThreadPool.class.getName()));
+ }
+
+ /**
+ * Runs a check in the ThreadPool for the {@link EvaluatorManager} to wait for it to finish its
+ * Event Handling and check its idleness source.
+ * @param manager the {@link EvaluatorManager}
+ */
+ void runCheckAsync(final EvaluatorManager manager) {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (!manager.isClosed()) {
+ try {
+ Thread.sleep(waitInMillis);
+ } catch (final InterruptedException e) {
+ LOG.log(Level.SEVERE, "Thread interrupted while waiting for Evaluator to finish.");
+ throw new RuntimeException(e);
+ }
+ }
+
+ manager.checkIdlenessSource();
+ LOG.log(Level.FINE, "Evaluator " + manager.getId() + " has finished.");
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index f483868..a8bd61a 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -107,6 +107,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
private final LoggingScopeFactory loggingScopeFactory;
private final Set<ConfigurationProvider> evaluatorConfigurationProviders;
private final DriverRestartManager driverRestartManager;
+ private final EvaluatorIdlenessThreadPool idlenessThreadPool;
// Mutable fields
private Optional<TaskRepresenter> task = Optional.empty();
@@ -132,7 +133,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
final LoggingScopeFactory loggingScopeFactory,
@Parameter(EvaluatorConfigurationProviders.class)
final Set<ConfigurationProvider> evaluatorConfigurationProviders,
- final DriverRestartManager driverRestartManager) {
+ final DriverRestartManager driverRestartManager,
+ final EvaluatorIdlenessThreadPool idlenessThreadPool) {
this.contextRepresenters = contextRepresenters;
this.idlenessSource = idlenessSource;
LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
@@ -153,6 +155,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
this.loggingScopeFactory = loggingScopeFactory;
this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
this.driverRestartManager = driverRestartManager;
+ this.idlenessThreadPool = idlenessThreadPool;
LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
}
@@ -264,12 +267,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
}
}
- try {
- this.messageDispatcher.close();
- } catch (Exception e) {
- LOG.log(Level.SEVERE, "Exception while closing EvaluatorManager", e);
- }
- this.idlenessSource.check();
+ idlenessThreadPool.runCheckAsync(this);
}
/**
@@ -282,6 +280,13 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
}
/**
+ * Triggers a call to check the idleness of the Evaluator.
+ */
+ void checkIdlenessSource() {
+ this.idlenessSource.check();
+ }
+
+ /**
* EvaluatorException will trigger is FailedEvaluator and state transition to FAILED.
*
* @param exception on the EvaluatorRuntime
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java
new file mode 100644
index 0000000..0dce5c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessThreadPoolSize.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The number of Threads in a Driver to verify the completion of Evaluators.
+ * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+ */
+@NamedParameter(doc = "The number of Threads in a Driver to verify the completion of Evaluators.",
+ default_value = "5")
+public final class EvaluatorIdlenessThreadPoolSize implements Name<Integer> {
+ private EvaluatorIdlenessThreadPoolSize(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java
new file mode 100644
index 0000000..a0168c5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/parameters/EvaluatorIdlenessWaitInMilliseconds.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The number of Threads in a Driver to verify the completion of Evaluators.
+ * Used by {@link org.apache.reef.runtime.common.driver.evaluator.EvaluatorIdlenessThreadPool}.
+ */
+@NamedParameter(doc = "The number of milliseconds to wait in a loop to verify the completion of Evaluators.",
+ default_value = "500")
+public final class EvaluatorIdlenessWaitInMilliseconds implements Name<Long> {
+ private EvaluatorIdlenessWaitInMilliseconds(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 36c0aee..03c1463 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -114,7 +114,7 @@ public final class DispatchingEStage implements AutoCloseable {
* Return true if there are no messages queued or in processing, false otherwise.
*/
public boolean isEmpty() {
- return this.stage.getQueueLength() == 0;
+ return this.stage.getQueueLength() + this.stage.getActiveCount() == 0;
}
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/2b032719/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 3d46e37..3b39c8a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -174,24 +174,32 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> {
@SuppressWarnings("checkstyle:illegalcatch")
public void onNext(final T value) {
beforeOnNext();
- executor.submit(new Runnable() {
-
- @Override
- public void run() {
- try {
- handler.onNext(value);
- afterOnNext();
- } catch (final Throwable t) {
- if (errorHandler != null) {
- errorHandler.onNext(t);
- } else {
- LOG.log(Level.SEVERE, name + " Exception from event handler", t);
- throw t;
+ try {
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ handler.onNext(value);
+ } catch (final Throwable t) {
+ if (errorHandler != null) {
+ errorHandler.onNext(t);
+ } else {
+ LOG.log(Level.SEVERE, name + " Exception from event handler", t);
+ throw t;
+ }
+ } finally {
+ afterOnNext();
}
}
- }
- });
+ });
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Encountered error when submitting to executor in ThreadPoolStage.");
+ afterOnNext();
+ throw e;
+ }
+
}
/**
@@ -218,4 +226,11 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> {
return ((ThreadPoolExecutor) executor).getQueue().size();
}
+ /**
+ * Gets the active count of this stage.
+ * @return the active count
+ */
+ public int getActiveCount() {
+ return (int)(getInMeter().getCount() - getOutMeter().getCount());
+ }
}